Hadoop

Hadoop3

入门

大数据概论

  • 海量数据的采集存储和分析
  • 特点(4v)
    • 大量
    • 高速
    • 多样
    • 低价值密度:快速对有价值数据提纯
  • 应用场景:通过海量数据分析为各个行业领域提供更强的决策力和指导性
  • 大数据的业务流程和部门分布
    • 数仓组
    • 实时组

Hadoop概论

  • 分布式系统基础架构,主要解决海量数据的存储和分析计算

  • 面试题

    • 端口号
    • 常用配置文件
    • HDFS文件块大小
    • HDFS读写流程
    • MapTask工作机制
    • ReduceTask工作机制
    • Yarn工作机制
    • Yarn调度器
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    一、Hadoop入门
    1、常用端口号
    hadoop3.x
    HDFS NameNode 内部通常端口:8020/9000/9820
    HDFS NameNode 对用户的查询端口:9870
    Yarn查看任务运行情况的:8088
    历史服务器:19888
    hadoop2.x
    HDFS NameNode 内部通常端口:8020/9000
    HDFS NameNode 对用户的查询端口:50070
    Yarn查看任务运行情况的:8088
    历史服务器:19888
    2、常用的配置文件
    3.x core-site.xml hdfs-site.xml yarn-site.xml mapred-site.xml workers
    2.x core-site.xml hdfs-site.xml yarn-site.xml mapred-site.xml slaves

    二、HDFS
    1、HDFS文件块大小(面试重点)
    硬盘读写速度
    在企业中 一般128m(中小公司) 256m (大公司)
    2、HDFS的Shell操作(开发重点)
    3、HDFS的读写流程(面试重点)
    三、Map Reduce
    1、InputFormat
    1)默认的是TextInputformat kv key偏移量,v :一行内容
    2)处理小文件CombineTextInputFormat 把多个文件合并到一起统一切片
    2、Mapper
    setup()初始化; map()用户的业务逻辑; clearup() 关闭资源;
    3、分区
    默认分区HashPartitioner ,默认按照key的hash值%numreducetask个数
    自定义分区
    4、排序
    1)部分排序 每个输出的文件内部有序。
    2)全排序: 一个reduce ,对所有数据大排序。
    3)二次排序: 自定义排序范畴, 实现 writableCompare接口, 重写compareTo方法
    总流量倒序 按照上行流量 正序
    5、Combiner
    前提:不影响最终的业务逻辑(求和 没问题 求平均值)
    提前聚合map => 解决数据倾斜的一个方法
    6、Reducer
    用户的业务逻辑;
    setup()初始化;reduce()用户的业务逻辑; clearup() 关闭资源;
    7、OutputFormat
    1)默认TextOutputFormat 按行输出到文件
    2)自定义
    四、Yarn
    1、Yarn的工作机制(面试题)

    2、Yarn的调度器
    1)FIFO/容量/公平
    2)apache 默认调度器 容量; CDH默认调度器 公平
    3)公平/容量默认一个default ,需要创建多队列
    4)中小企业:hive spark flink mr
    5)中大企业:业务模块:登录/注册/购物车/营销
    6)好处:解耦 降低风险 11.11 6.18 降级使用
    7)每个调度器特点:
    相同点:支持多队列,可以借资源,支持多用户
    不同点:容量调度器:优先满足先进来的任务执行
    公平调度器,在队列里面的任务公平享有队列资源
    8)生产环境怎么选:
    中小企业,对并发度要求不高,选择容量
    中大企业,对并发度要求比较高,选择公平。
    3、开发需要重点掌握:
    1)队列运行原理
    2)Yarn常用命令
    3)核心参数配置
    4)配置容量调度器和公平调度器。
    5)tool接口使用。
  • 优势

    • 高可靠
    • 高扩展
    • 高效:集群中多Hadoop并行工作
    • 高容错:自动分配失败任务
  • 组成

    • HDFS:数据存储
      • NameNode(nn):管理文件的元数据如文件名/属性/目录结构,及文件块列表及块所在的DataNode(hdfs集群中的老大)
      • DataNode(dn):在本地存储文件块数据及其校验和(hdfs集群中的小弟)
      • SecondaryNameNode(2nn):定时为NameNode备份,恢复数据的时候才用,不能保证完全数据恢复
    • YARN:资源调度
      • ResourceManager(rm):统筹管理每一台机器上的资源,并且负责接收处理客户端作业请求
      • NodeManager(nm):负责单独每一台机器的资源管理,实时保证和大哥(ResourceManager)通信
      • ApplicationMaster:针对每个请求job的抽象封装(单个任务的老大)
      • Container:将来运行在YARN上的每一个任务都会给其分配资源,Container就是当前任务所需资源的抽象封装(容器】)
    • MapReduce:数据计算分析
      • Map阶段:就是把需要计算的数据按照需求分成多个MapTask任务来执行
      • Reduce阶段: 把Map阶段处理完的结果拷贝过来根据需求进行汇总计算
  • 端口(面试题)

    • NameNode内部通信端口:8020/9000/9820
    • NameNode网页:9870
    • MapReduce查看执行任务端口:8088
    • 历史服务器通信端口:19888
  • 运行模式

    • 本地模式:默认安装后启动模式,将数据存在Linux本地,并且运行MR程序的时候也是在本地机器上运行
    • 伪分布式模式:在一台机器上启动HDFS集群,启动YARN集群,并且数据存在HDFS集群上,以及运行MR程序也是在YARN上运行,计算后的结果也是输出到HDFS上。本质上就是利用一台服务器中多个java进程去模拟多个服务
    • 完全分布式:多台机器上分别启动HDFS集群,启动YARN集群,并且数据存在HDFS集群上的以及运行MR程序也是在YARN上运行,计算后的结果也是输出到HDFS上
  • 目录结构

    • bin目录:存放对Hadoop相关服务(HDFS,YARN)进行操作的脚本
    • etc目录:Hadoop的配置文件目录,存放Hadoop的配置文件
    • lib目录:存放Hadoop的本地库(对数据进行压缩解压缩功能)
    • sbin目录:存放启动或停止Hadoop相关服务的脚本
    • share目录:存放Hadoop的依赖jar包、文档、和官方案例
  • 常用配置文件

    • core-site.xml
    • hdfs-site.xml
    • yarn-site.xml
    • mapred-site.xml
    • workers

运行环境搭建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
安装JDK
安装Hadoop
# 编辑/etc/profile
vim /etc/profile
# 添加环境变量(Hadoop为例)
export HADOOP_HOME=/hadoop-3.2.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin


# /etc/profile
if [ -d /etc/profile.d ]; then
for i in /etc/profile.d/*.sh; do
if [ -r $i ]; then
. $i
fi
done
unset i
fi
# 加载一次/etc/profile
source /etc/profile
# 编辑~/.bashrc
vim ~/.bashrc
# 添加代码
if [ -f /etc/bashrc ]; then
. /etc/bashrc
fi


source /etc/profile

本地运行模式

  • 需求:统计以文件中单词出现的次数
  • 实现步骤
    1. 在当前hadoop的安装目录创建一个文件
    2. 运行hadoop官方提供的wordcount案例
    3. hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar
    4. wordcount file:///wcinput file:///wcoutput

完全分布式

  1. 准备完全分布式需要的机器:hadoop1 hadoop2 hadoop3

  2. 编写集群分发脚本

    • scp:Secure Copy,完全拷贝内容,不做任何比较,如果目的地已有相关内容,它会进行覆盖

      1
      2
      3
      4
      5
      6
      7
      8
      9
      # 将hadoop102 上/opt/module 所有目录拷贝到 hadoop103的/opt/module
      scp -r ./* atguigu@hadoop103:/opt/module/
      # 在hadoop104上执行:将hadoop102的内容 拉取到hadoop104的指定位置
      scp -r atguigu@hadoop102:/opt/module/jdk1.8.0_212 /opt/module/
      # 在hadoop103上执行:将hadoop102的内容 发送给 hadoop104
      scp -r atguigu@hadoop102:/opt/module/hadoop-3.1.3 atguigu@hadoop104:/opt/module\
      # 将hadoop102 上的/opt/software/hadoop-3.1.3.tar.gz 同步给 hadoop103
      # 同步环境变量文件 /etc/profile.d/my_env.sh
      scp -r /etc/profile.d/my_env.sh root@hadoop103:/etc/profile.d/
    • rsync :远程同步工具,速度快,不会对重复文件进行拷贝

      • 注意:rsync 限制同步数据的时候只能有两台机器进行通信
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      # 安装
      apt install -y rsync
      rsync -av $pdir/$fname $host:$pdir
      # 为了省去配置环境变量的步骤,直接在/usr/local/bin下
      vim /usr/local/bin/xsync
      # 创建bin目录 把脚本创造 bin目录下 。
      # 脚本内容如下:

      #!/bin/bash
      # 参数预处理
      if [ $# -lt 1 ]
      then
      echo "参数不能为空!!!"
      exit
      fi
      # 遍历集群找那个所有的机器 进行内容分发
      for host in 172.17.0.2 172.17.0.3 172.17.0.4
      do
      echo "===============$host================"
      # 遍历所要分发的内容 进行传输
      for file in $@
      do
      #判断当前文件是否存在
      if [ -e $file ]
      then
      # 存在
      # 获取父级目录
      pdir=$(cd -P $(dirname $file); pwd)
      # 获取当前的文件名
      fname=$(basename $file)
      # 在目标服务器创建同级目录
      ssh $host "mkdir -p $pdir"
      # 同步文件
      rsync -av $pdir/$fname $host:$pdir
      else
      # 不存在
      echo "$file 不存在"
      exit
      fi
      done
      done
      # 更改xsync权限
      chmod +x xsync
  3. 规划集群

    • NameNode与SecondaryNameNode不要放在一台服务器上
    • ResourceManger很消耗内存,不要放在NN和2NN上
    Hadoop1 Hadoop2 Hadoop3
    HDFS NameNode DataNode DataNode SecondaryNameNode DataNode
    YARN NodeManager ResourceManager NodeManager NodeManager
  4. 配置文件

    • 默认配置文件
      • core-default.xml
      • hdfs-default.xml
      • mapread-default.xml
      • yarn-default.xml
    • 可自定义的配置文件
      • core-site.xml
      • hdfs-site.xml
      • mapread-site.xml
      • yarn-site.xml
    • Hadoop 中加载配置文件的顺序
      • 当Hadoop集群启动后,先加载默认配置,然后再加载自定义配置文件,自定义的配置信息会覆盖默认配置
  5. 配置集群并分发

    • hadoop-env.sh:主要映射jdk的环境变量,可以不配

    • core-site.xml:配置hadoop的全局信息

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      <configuration>
      <!-- 指定NameNode的地址 -->
      <property>
      <name>fs.defaultFS</name>
      <value>hdfs://bigdata1:9820</value>
      </property>
      <!-- 指定hadoop数据的存储目录 -->
      <property>
      <name>hadoop.tmp.dir</name>
      <value>/hadoop/data</value>
      </property>

      <!-- 配置HDFS网页登录使用的静态用户为root -->
      <property>
      <name>hadoop.http.staticuser.user</name>
      <value>root</value>
      </property>

      <!-- 配置该atguigu(superUser)允许通过代理访问的主机节点 -->
      <property>
      <name>hadoop.proxyuser.root.hosts</name>
      <value>*</value>
      </property>
      <!-- 配置该atguigu(superUser)允许通过代理用户所属组 -->
      <property>
      <name>hadoop.proxyuser.root.groups</name>
      <value>*</value>
      </property>
      <!-- 配置该atguigu(superUser)允许通过代理的用户-->
      <property>
      <name>hadoop.proxyuser.root.s</name>
      <value>*</value>
      </property>

      </configuration>
    • hdfs-site.xml

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      <configuration>
      <property>
      <name>dfs.webhdfs.enabled</name>
      <value>true</value>
      </property>
      <!-- nn web端访问地址-->
      <property>
      <name>dfs.namenode.http-address</name>
      <value>bigdata1:9870</value>
      </property>
      <!-- 2nn web端访问地址-->
      <property>
      <name>dfs.namenode.secondary.http-address</name>
      <value>bigdata3:9868</value>
      </property>
      </configuration>
    • mapread-site.xml

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      <configuration>
      <!-- 指定MapReduce程序运行在Yarn上 -->
      <property>
      <name>mapreduce.framework.name</name>
      <value>yarn</value>
      </property>
      </configuration>
      <property>
      <name>yarn.app.mapreduce.am.env</name>
      <value>HADOOP_MAPRED_HOME=hadoop</value>
      </property>
      <property>
      <name>mapreduce.map.env</name>
      <value>HADOOP_MAPRED_HOME=/hadoop</value>
      </property>
      <property>
      <name>mapreduce.reduce.env</name>
      <value>HADOOP_MAPRED_HOME=/hadoop</value>
      </property>
      <!-- 历史服务器端地址 -->
      <property>
      <name>mapreduce.jobhistory.address</name>
      <value>bigdata1:10020</value>
      </property>
      <!-- 历史服务器web端地址 -->
      <property>
      <name>mapreduce.jobhistory.webapp.address</name>
      <value>bigdata1:19888</value>
      </property>
    • yarn-site.xml

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      <configuration>
      <!-- 指定MR走shuffle -->
      <property>
      <name>yarn.nodemanager.aux-services</name>
      <value>mapreduce_shuffle</value>
      </property>
      <!-- 指定ResourceManager的地址-->
      <property>
      <name>yarn.resourcemanager.hostname</name>
      <value>bigdata2</value>
      </property>
      <!-- 开启日志聚集功能 -->
      <property>
      <name>yarn.log-aggregation-enable</name>
      <value>true</value>
      </property>
      <!-- 设置日志聚集服务器地址 -->
      <property>
      <name>yarn.log.server.url</name>
      <value>http://bigdata1:19888/jobhistory/logs</value>
      </property>
      <!-- 设置日志保留时间为7天 -->
      <property>
      <name>yarn.log-aggregation.retain-seconds</name>
      <value>604800</value>
      </property>
  6. 配置workers

    1
    2
    3
    4
    # 不能有空格和空行
    172.17.0.2
    172.17.0.3
    172.17.0.4
  7. 分发配置

    1
    xsync /hadoop/etc/hadoop/
  8. 启动集群

    • 启动HDFS集群

      • 只有首次启动HDFS需要对NameNode进行格式化操作
      • 格式化NameNode,会产生新的集群id,导致NameNode和DataNode的集群id不一致,集群找不到已往数据。如果集群在运行过程中报错,需要重新格式化NameNode的话,一定要先停止namenode和datanode进程,并且要删除所有机器的data和logs目录,然后再进行格式化
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      	# 在hadoop1执行
      hdfs namenode -format
      # 启动集群:统一启动
      /hadoop/sbin/start-dfs.sh
      # 若试图以根用户身份在hdfs namenode上操作,但是没有定义HDFS_ NAMENODE_用户。中止操作
      # 进入环境变量
      vim /etc/profile
      export HDFS_NAMENODE_USER=root
      export HDFS_DATANODE_USER=root
      export HDFS_JOURNALNODE_USER=root
      export HDFS_SECONDARYNAMENODE_USER=root
      export HDFS_ZKFC_USER=root
      export YARN_RESOURCEMANAGER_USER=root
      export YARN_NODEMANAGER_USER=root
      # 生效环境变量
      source /etc/profile

      # 单独节点启动namenode
      hdfs --daemon start datanode
      # 单独节点启动secondarynamenode
      hdfs --daemon start secondarynamenode
    • 启动YARN集群

      1
      2
      3
      4
      5
      6
      7
      # hadoop2上启动
      /hadoop/sbin/start-yarn.sh

      # 单独节点启动resourcemanager
      yarn --daemon start resourcemanager
      # 单独节点启动nodemanager
      yarn --daemon start nodemanager
  9. 测试集群 (官方的wordcount案例在集群上跑一遍)

    1
    2
    3
    # 运行官方wordcount案例
    hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount file:///wcinput file:///wcoutput
    hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount hdfs://hadoop102:9820/wcinput hdfs://hadoop102:9820/wcoutput
  10. 崩溃解决方式

    1. 暂停服务
    2. 删除每个节点上的data和logs
    3. 格式化
  11. 配置历史服务器

    • 历史服务器是针对MR程序执行的历史记录

    • 配置mapred-site.xml

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      <!-- 历史服务器端地址 -->
      <property>
      <name>mapreduce.jobhistory.address</name>
      <value>bigdata1:10020</value>
      </property>
      <!-- 历史服务器web端地址 -->
      <property>
      <name>mapreduce.jobhistory.webapp.address</name>
      <value>bigdata1:19888</value>
      </property>
    • 分发

    • 在bigdata1启动服务

      1
      mapred --daemon start historyserver
  12. 配置日志聚集

    • 所有节点的日志聚集起来

    • 日志是针对 MR 程序运行是所产生的的日志,方便后期分析问题 有更好的 执行过过程的依据

    • 日志聚集后,会在HDFS的 /tmp目录下存储

    • 配置 yarn-site.xml

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      <!-- 开启日志聚集功能 -->
      <property>
      <name>yarn.log-aggregation-enable</name>
      <value>true</value>
      </property>
      <!-- 设置日志聚集服务器地址 -->
      <property>
      <name>yarn.log.server.url</name>
      <value>http://bigdata1:19888/jobhistory/logs</value>
      </property>
      <!-- 设置日志保留时间为7天 -->
      <property>
      <name>yarn.log-aggregation.retain-seconds</name>
      <value>604800</value>
      </property>
    • 分发重启

  13. 配置Hadoop集群的时间服务器(了解)

    • 在集群当中,所有服务器时间基本都是一致,所以要求进行时间同步
    • 一般那情况服务器本身可以通过同步网络时间达到时间一致的要求
    • 一些特殊情况下 需要自己配置时间服务器(不连外网的情况)
    • 配置时间服务器器的步骤:参考word文档的的步骤!!
  14. Hadoop编译源码

    • 编译源码为了整合hadoop和其他框架技术或者服务器环境的兼容。
  15. 总结

    • 格式问题
      • 启动namenode后再启动datanode发起datanode无法启动
      • 正常情况下集群只需要在首次启动的时候需要格式化NameNode
      • 如果以后想格式,必须先要把集群中的所有机器的data logs都删除
    • 用户登录问题

常用脚本

分发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 安装
apt install -y rsync
rsync -av $pdir/$fname $host:$pdir
# 为了省去配置环境变量的步骤,直接在/usr/local/bin下
vim /usr/local/bin/xsync
# 创建bin目录 把脚本创造 bin目录下 。
# 脚本内容如下:

#!/bin/bash
# 参数预处理
if [ $# -lt 1 ]
then
echo "参数不能为空!!!"
exit
fi
# 遍历集群找那个所有的机器 进行内容分发
for host in 172.17.0.2 172.17.0.3 172.17.0.4
do
echo "===============$host================"
# 遍历所要分发的内容 进行传输
for file in $@
do
#判断当前文件是否存在
if [ -e $file ]
then
# 存在
# 获取父级目录
pdir=$(cd -P $(dirname $file); pwd)
# 获取当前的文件名
fname=$(basename $file)
# 在目标服务器创建同级目录
ssh $host "mkdir -p $pdir"
# 同步文件
rsync -av $pdir/$fname $host:$pdir
else
# 不存在
echo "$file 不存在"
exit
fi
done
done
# 更改xsync权限
chmod +x xsync

集群启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
vim /usr/local/bin/myhadoop.sh

#!/bin/bash
if [ $# -lt 1 ]
then
echo "No Args Input..."
exit ;
fi
case $1 in
"start")
echo " =================== 启动 hadoop集群 ==================="
echo " --------------- 启动 hdfs ---------------"
ssh bigdata1 "/hadoop/sbin/start-dfs.sh"
echo " --------------- 启动 yarn ---------------"
ssh bigdata2 "/hadoop/sbin/start-yarn.sh"
echo " --------------- 启动 historyserver ---------------"
ssh bigdata1 "/hadoop/bin/mapred --daemon start historyserver"
;;
"stop")
echo " =================== 关闭 hadoop集群 ==================="
echo " --------------- 关闭 historyserver ---------------"
ssh bigdata1 "/hadoop/bin/mapred --daemon stop historyserver"
echo " --------------- 关闭 yarn ---------------"
ssh bigdata2 "/hadoop/sbin/stop-yarn.sh"
echo " --------------- 关闭 hdfs ---------------"
ssh bigdata1 "/hadoop/sbin/stop-dfs.sh"
;;
*)
echo "Input Args Error..."
;;
esac

chmod +x myhadoop.sh

查看集群进程

1
2
3
4
5
6
7
8
9
10
vim /usr/local/bin/jpsall

#!/bin/bash
for host in bigdata1 bigdata2 bigdata3
do
echo =============== $host ===============
ssh $host jps $@ | grep -v Jps
done

chmod +x jpsall

HDFS

概述

  • 分布式扽见管理系统的一种
  • 优点
    • 容错性高,适合海量数据的存储和管理
  • 缺点
    • 仅支持数据的追加,不支持直接修改
    • 对小文件比较敏感
    • 不适合低延时的数据访问
  • 文件块大小
    • 默认128M(企业中一般使用128M或256M)
    • 寻址时间是传输时间的1% 为最佳状态,大小设置取决于存储磁盘速率
      • 寻址时间:查找到目标块的时间
      • 设置太小会增加寻址时间
      • 设置太大会增加传输数据时间

Shell操作

  • 基本命令

    1
    2
    3
    4
    hadoop fs 具体命令
    hdfs dfs 具体命令
    # 以上命令等价
    hadoop fs -help 需要查询的命令
  • 上传

    1
    2
    3
    4
    5
    6
    7
    # 从本地剪切粘贴到HDFS
    hdfs dfs -moveFromLocal /本地文件 /目标路径
    # 从本地文件系统中拷贝文件到HDFS路径去
    hdfs dfs -copyFromLocal /本地文件 /目标路径
    hdfs dfs -put /本地文件 /目标路径
    # 追加一个文件到已经存在的文件末尾
    hdfs dfs -appendToFile /本地文件 /目标文件
  • 下载

    1
    2
    3
    4
    5
    # 从HDFS拷贝到本地
    hdfs dfs -copyToLocal /目标文件 /本地路径
    hdfs dfs -get /目标文件 /本地路径
    # 合并下载多个文件
    hdfs dfs -getmerge /目标路径/* /本地路径/文件
  • 直接操作HDFS

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # 显示一个文件的末尾1kb的数据
    hdfs dfs -tail /目标文件
    # 删除文件或文件夹
    hdfs dfs -rm /目标文件
    # 删除空目录
    hdfs dfs -mkdir /目标目录
    # 统计文件夹的大小信息
    hdfs dfs -du -s -h /目标文件
    # 设置HDFS中文件的副本数量,实际数量依赖DataNode数量
    hdfs dfs -setrep 数量 /目标文件
    # 其他同linux

客户端API

  • pom

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.3.1</version>
    </dependency>
  • resources目录下可配置hdfs-site.xml

    • 优先级:代码中的配置 > resources目录hdfs-site.xml > 服务器hdfs-site.xml > 服务器hdfs-default.xml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    <?xml version="1.0" encoding="UTF-8"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

    <configuration>
    <property>
    <name>dfs.replication</name>
    <value>1</value>
    </property>
    </configuration>
  • 业务代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    public class HdfsClient {

    private FileSystem fs;

    @Before
    public void init() throws URISyntaxException, IOException, InterruptedException {
    // 获取文件系统
    Configuration configuration = new Configuration();
    configuration.set("dfs.replication", "2");
    // 配置在集群上运行
    fs = FileSystem.get(new URI("hdfs://bigdata1:9820"), configuration, "root");
    }

    @After
    public void close() throws IOException {
    fs.close();
    }

    // 创建目录
    @Test
    public void testMkdirs() throws IOException {
    fs.mkdirs(new Path("/xiyou/huaguoshan"));
    }

    // 上传
    @Test
    public void testPut() throws IOException {
    fs.copyFromLocalFile(false, true, new Path("/Users/bofei/IdeaProjects/hadoop/src/main/resources/sunwukong.txt"), new Path("/xiyou/huaguoshan"));
    }

    // 下载
    @Test
    public void testGet() throws IOException {
    fs.copyToLocalFile(false, new Path("/xiyou/huaguoshan/sunwukong.txt"), new Path("/Users/bofei/IdeaProjects/hadoop/src/main/resources/sunwukong.txt"), false);
    }

    // 删除
    @Test
    public void testRm() throws IOException {
    // 参数2:是否递归
    fs.delete(new Path("/xiyou/huaguoshan/sunwukong.txt"), false);
    // 删除空目录
    fs.delete(new Path("/xiyou"), false);
    // 删除非空目录
    fs.delete(new Path("/xiyou"), true);
    }

    // 文件移动
    @Test
    public void testMv() throws IOException {
    fs.rename(new Path("/xiyou/huaguoshan/sunwukong.txt"), new Path("/xiyou/huaguoshan1/sunwukong.txt"));
    // 更改目录名
    fs.rename(new Path("/input"), new Path("output"));
    }

    // 查看详细信息
    @Test
    public void fileDetail() throws IOException {
    // 获取所有文件信息,参数2:是否递归
    RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path("/"), true);
    while (iterator.hasNext()) {
    LocatedFileStatus fileStatus = iterator.next();
    System.out.println("文件名:" + fileStatus.getPath().getName());
    System.out.println("文件路径:" + fileStatus.getPath());
    System.out.println("文件权限:" + fileStatus.getPermission());
    System.out.println("文件块大小:" + fileStatus.getBlockSize());
    // ...
    // 获取块信息
    BlockLocation[] blockLocations = fileStatus.getBlockLocations();
    System.out.println(Arrays.toString(blockLocations));
    }
    }

    // 判断文件夹或文件
    @Test
    public void testFile() throws IOException {
    FileStatus[] fileStatuses = fs.listStatus(new Path("/"));
    for (FileStatus fileStatus : fileStatuses) {
    System.out.println((fileStatus.isFile() ? "文件" : "目录") + fileStatus.getPath().getName());
    }
    }

    }

读写流程(面试)

写数据

  • 流程

    1. Client创建分布式文件系统并向NameNode请求上传文件
    2. NameNode检查权限和目录结构,若可以则返回允许响应
    3. Client请求上传一个Block,请求返回DataNode
    4. NameNode选择多个节点并返回(优先选择本地节点)
    5. Client创建数据流进行写数据(Client与其中一个DataNode进行写数据,由该节点进行数据边写边分发)
    • 传输数据单位为Packet(64k)

    • Packet由多个【chunk(512byte)+ 校验位(4byte)】所构成

    1. 写完数据后DataNode从后向前最后到Client完成应答
  • 机架感知:HDFS根据请求返回DataNode的节点的策略

    • 若当前Client所在机器有DataNode节点则返回当前机器DN1,否则从集群中随机一台

    • 根据第一台机器的位置,再其他机架上随机一台,在第二台机器所在机架上再随机一台

    • 保证数据的可靠性和传输的效率

  • 网络拓扑:客户端建立传输通道确定和一台DataNode先建立连接的方法

    • 找离Client最近的一台机器先建立通道(共同祖先距离最少)
    • Client以串行的方式建立通道,降低Client的IO开销
    • 采用了ack回执的策略保证数据完整成功上传

读数据

  • 流程
    1. Client创建分布式文件系统并向NameNode请求下载文件
    2. NameNode检查权限和目录结构,若可以则数据元数据
    3. Client创建流,根据距离和负载选择节点串行读取数据

NN和2NN(了解)

  • 元数据信息保存位置: 磁盘 + 内存
    • 保存到磁盘:读写速度慢,效率低

    • 保存内存:数据不安全

  • 内存和磁盘中的元数据进行同步的方法(元数据的维护策略)
    • 先记账再放到内存中
    • 对元数据进行操作时,首先在内存进行合并,其次把相关操作记录追加到edits编辑日志文件中,满足一定条件下,将edits文件中的记录合并到元数据信息文件中fsimage
  • 2NN负责对NN的元数据信息进行合并
    • 2NN主要负责对NN的元数据精心合并,满足一定条件,2NN会检测本地时间,每隔一个小时会主动对NN的edits文件和fsimage文件进行一次合并
    • 合并时首先会通NN,NN会停止对正在使用的edits文件追加,同时新建新的edits编辑日志文件保证NN的正常工作。2NN把NN本地的fsimage文件和edits编辑日志拉取2NN的本地,在内存中对二者进行合并,产生最新fsimage文件。把新的fsimage文件发送给NN
    • 当NN的edits文件中的操作次数累计达到100万次,即便还没到1小时2NN也会进行合并(2NN每隔60秒会检测一次NN方的edits文件的操作次数)
    • 2NN会自己把最新的fsimage文件备份一份

DN(了解)

  • 流程
    1. DateNode启动后向NameNode注册(正确的块信息)
    2. 每6小时汇报一次(块完好)
    3. 心跳3秒一次(活着)
    4. 超过10分钟30秒没收到心跳则认为该节点不可用
  • 数据完整性:采用CRC32校验

MapReduce

概述

  • 定义

    • 分布式运算程序的编程框架
    • 将用户编写的业务逻辑融代码和自带默认组件整合成一个完整的分布式运算程序
  • 优点

    • 易于编写:用户只关心业务逻辑
    • 高扩展性:扩展集群机器增加计算能力
    • 高容错性:MR如果执行失败会重新分配任务
    • 适合做海量数据的离线处理
  • 缺点

    • 不擅长实时计算:Mysql
    • 不擅长流式计算:Spark,Flink
  • 核心思想

    • Map阶段: 将文件进行逻辑划分(分片,128M)后,进行分割处理
      • 并行运行互不相干
    • Reduce阶段:将Map阶段处理好的数据进行汇总
      • 互不相干
  • 数据类型

    • 参照Java的来记忆
      • Text == String
      • IntWritable == int
  • 编程规范

    • Mapper阶段
      • 自定义Mapper类并继承Hadoop提供的Mapper
      • Mapper输入输出都是键值对形式
      • 重写map方法,在该方法中实现业务逻辑的编写
      • map方法对每个键值对调用一次
    • Reducer阶段
      • 自定义Reducer类并继承Hadoop提供的Reducer
      • Reducer输入类型对应Mapper输出类型
      • 重写reduce方法,在该方法中实现业务逻辑的编写
    • Driver阶段
      • 提交到Yarn进行运行
  • 提交方式

    • 本地运行 (自测场景)

    • 打包后上传到集群运行 yarn上(注意输入路径和输出路径的写法)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      <!-- 打包插件 -->
      <build>
      <plugins>
      <plugin>
      <artifactId>maven-compiler-plugin</artifactId>
      <version>3.6.1</version>
      <configuration>
      <source>1.8</source>
      <target>1.8</target>
      </configuration>
      </plugin>
      <!-- 带依赖 -->
      <plugin>
      <artifactId>maven-assembly-plugin</artifactId>
      <configuration>
      <descriptorRefs>
      <descriptorRef>jar-with-dependencies</descriptorRef>
      </descriptorRefs>
      </configuration>
      <executions>
      <execution>
      <id>make-assembly</id>
      <phase>package</phase>
      <goals>
      <goal>single</goal>
      </goals>
      </execution>
      </executions>
      </plugin>
      </plugins>
      </build>
      1
      hadoop jar (jar包路径) (运行程序的全类名) 输入路径 输出路径
    • Windows上直接将job提交的集群上运行(了解)

序列化

  • 优点

    • 紧凑,快速

      • Java Serializable采用各种校验信息/头/继承体系
      • Hadoop采用简单校验
    • 互操作性:序列化和反序列化可采用不同语言

  • 步骤

    1. 实现Wirtable接口
    2. 提交无参构造:用于反序列化过程(不写也行)
    3. 重写序列化方法 write()
    4. 重写反序列化的方法 readFields()
    5. 序列化和反序列化时属性的读写顺序一定要保持一致
    6. 若自定义bean需要在key中传输则需要实现Comparable接口
    7. 重写toString():能灵活的控制做种对象输出的结果
  • 序列化的案例操作:统计手机对应流量情况

原理

  • 简易版: InputFormat → Mapper → Reducer → OutputFormat

  • 详细版: InputFormat → map sort → copy sort reduce → OutputFormat

InputFormat

  • 切片

    • 文件逻辑分片,Map阶段并行度由切片数决定
    • 一个切片产生一个MapTask,片多大一个MapTask的处理的数据就多大
    • 切片时只考虑文件本身,不考虑数据的整体集
    • 切片大小和切块大小默认一致,避免跨机器读取数据
      • 数据块:HDFS存储数据单位,物理
      • 数据切片:MapReduce计算输入数据的单位
    • 每个文件单独切片
    • 剩余文件大小 大于 切片大小的1.1倍才继续切片
  • 体系结构

    • FileInputFormat
      • nputFormat的子实现类,实现切片逻辑
      • getSplits()负责切片
    • TextInputFormat
      • 安行读取
      • FileInputFormat的子实现类,实现读取数据的逻辑
      • createRecordReader()返回一个RecordReader实现
    • CombineFileInputFormat
      • 适用于多个小文件计算场景:将多个小文件从逻辑上划到一个切片中
      • FileInputFormat的子实现类,此类中实现了一套切片逻辑
  • 默认切片规则(源码角度分析)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    // 切片源码
    public List<InputSplit> getSplits(JobContext job) throws IOException {
    StopWatch sw = new StopWatch().start();
    // minSize = 1(默认情况)
    // 但是我们也可以通过改变mapreduce.input.fileinputformat.split.minsize 配置项来改变minSize大小
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    // maxSize = Long的最大值(默认情况)
    // 但是我们也可以通过改变mapreduce.input.fileinputformat.split.maxsize 配置项来改变maxSize大小
    long maxSize = getMaxSplitSize(job);
    // 管理最终切完片的对象的集合 最终返回的就是此集合
    List<InputSplit> splits = new ArrayList<InputSplit>();
    // 获取当前文件的详情
    List<FileStatus> files = listStatus(job);
    boolean ignoreDirs = !getInputDirRecursive(job)
    && job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
    // 遍历获取到的文件列表,一次按照文件为单位进行切片
    for (FileStatus file: files) {
    // 如果是忽略文件以及是文件夹就不进行切片
    if (ignoreDirs && file.isDirectory()) {
    continue;
    }
    // 获取文件的路径
    Path path = file.getPath();
    // 获取文件的内容大小
    long length = file.getLen();
    // 如果不是空文件 继续切片
    if (length != 0) {
    // 获取文件的具体的块信息
    BlockLocation[] blkLocations;
    if (file instanceof LocatedFileStatus) {
    blkLocations = ((LocatedFileStatus) file).getBlockLocations();
    } else {
    FileSystem fs = path.getFileSystem(job.getConfiguration());
    blkLocations = fs.getFileBlockLocations(file, 0, length);
    }
    // 判断是否要进行切片(主要判断当前文件是否是压缩文件,有一些压缩文件时不能够进行切片)
    if (isSplitable(job, path)) {
    // 获取HDFS中的数据块的大小
    long blockSize = file.getBlockSize();
    // 计算切片的大小--> 128M 默认情况下永远都是块大小
    long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    -- 内部方法:
    protected long computeSplitSize(long blockSize, long minSize,
    long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
    }
    long bytesRemaining = length;
    // 判断当前的文件的剩余内容是否要继续切片 SPLIT_SLOP = 1.1
    // 判断公式:bytesRemaining)/splitSize > SPLIT_SLOP
    // 用文件的剩余大小/切片大小 > 1.1 才继续切片(这样做的目的是为了让我们每一个MapTask处理的数据更加均衡)
    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
    int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
    splits.add(makeSplit(path, length-bytesRemaining, splitSize,
    blkLocations[blkIndex].getHosts(),
    blkLocations[blkIndex].getCachedHosts()));
    bytesRemaining -= splitSize;
    }
    // 如果最后文件还有剩余且不足一个切片大小,最后再形成最后的一个切片
    if (bytesRemaining != 0) {
    int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
    splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
    blkLocations[blkIndex].getHosts(),
    blkLocations[blkIndex].getCachedHosts()));
    }
    } else { // not splitable
    if (LOG.isDebugEnabled()) {
    // Log only if the file is big enough to be splitted
    if (length > Math.min(file.getBlockSize(), minSize)) {
    LOG.debug("File is not splittable so no parallelization "
    + "is possible: " + file.getPath());
    }
    }
    splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
    blkLocations[0].getCachedHosts()));
    }
    } else {
    //Create empty hosts array for zero length files
    splits.add(makeSplit(path, 0, length, new String[0]));
    }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
    LOG.debug("Total # of splits generated by getSplits: " + splits.size()
    + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }
    return splits;
    }

Shuffle

  • Map方法后Reduce方法前的数据处理过程

  • 流程

    1. MapTask执行完毕后使用getPatition()方法标记数据是哪个分区
    2. 进入环形缓冲区:默认100M,左侧存索引右侧存数据
    3. 到达容量的80%后对Key的索引按照字典顺序进行快排
    4. 反向溢写得到spill.index和spill.out文件(Combiner为可选流程)
    5. 对溢写出的数据进行归并排序
    6. (Combiner为可选流程)
    7. (压缩为可选流程)
    8. 写到磁盘上等待ReduceTask进行拉取并放入内存中,内存不足则溢写到磁盘上
    9. 对每个map来的数据进行归并排序
    10. 按照相同Key分组进入reduce()方法
  • Partitioner

    • 是Hadoop的分区器对象,给Map阶段输出数据选择分区的功能

    • 默认实现HashPartitioner类

    • 按照key的hashCode值对ReduceTask数量进行取余,得到数字即为当前kv所属分区的编号,分区编号在Job提交时就已根据ReduceTask的数量定义好

    • 默认分区规则源码解析

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      // 定位MapTask的map方法中 context.write(outk, outv);
      // 跟到write(outk, outv)中 进入到 ChainMapContextImpl类的实现中
      public void write(KEYOUT key, VALUEOUT value) throws IOException,
      InterruptedException {
      output.write(key, value);
      }
      // 跟到 output.write(key, value) 内部 NewOutputCollector
      public void write(K key, V value) throws IOException, InterruptedException {
      collector.collect(key, value,
      partitioner.getPartition(key, value, partitions));
      }
      // 重点理解 partitioner.getPartition(key, value, partitions);
      // 跟进默认的分区规则实现 HashPartitioner类
      public int getPartition(K key, V value,
      int numReduceTasks) {
      // 根据当前的key的hashCode值和ReduceTask的数量进行取余操作
      // 获取到的值就是当前kv所属的分区编号。
      return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
      }
    • 自定义分区器对象

      • 步骤

        1. 继承Hadoop提供的Partitioner类,实现getPartition()方法,编写业务逻辑

        2. Job驱动设置Partitioner为自定义类

        3. Job驱动设置ReduceTask数量

      • 注意事项

        • 从0开始,依次累加

        • ReduceTask数量 > 实际分区数:生成空的分区文件

        • ReduceTas数量设 < 实际分区数:报错

        • ReduceTask数量 = 1:输出到一个文件

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          // 获取当前ReduceTask的数量
          partitions = jobContext.getNumReduceTasks();
          // 判断ReduceTask的数量 是否大于1,找指定分区器对象
          if (partitions > 1) {
          partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
          ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
          } else {
          // 执行默认的分区规则,最终返回一个唯一的0号分区
          partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
          @Override
          public int getPartition(K key, V value, int numPartitions) {
          return partitions - 1;
          }
          };
          }
  • 比较排序

    • MapTast和ReduceTask均会按照Key进行排序,是默认行为

    • 分类

      • 部分排序:根据Key对数据排序,保证输出的每个文件内部有序

      • 全排序:最终输出结果只有一个文件,文件内部有序

      • 二次排序:自定义排序范畴,需实现WritableCompare接口

    • 实现方式

      • 方式1
        1. 让参与比较的对象实现WritableComparable接口,并在该类中实现compareTo()方法定义比较规则
        2. 运行时Hadoop会生自动成比较器对象WritableComparator
      • 方式2
        1. 自定义比较器对象,继承Hadoop的WritableComparator类,重写compare() 方法定义比较规则
        2. 调用父类的super()方法将自定义的比较器对象和参与比较的对象进行关联
        3. 在Driver类中指定自定义的比较器对象
    • 获取规则

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      // 定位到 MapTask 类中的init() 方法
      // 获取比较器对象
      comparator = job.getOutputKeyComparator();
      // 定位 JobConf 类中 getOutputKeyComparator()
      // job提交的时候 获取当前MR程序输出数据key的比较器对象
      public RawComparator getOutputKeyComparator() {
      Class<? extends RawComparator> theClass = getClass(
      JobContext.KEY_COMPARATOR, null, RawComparator.class);
      if (theClass != null){
      // 如果通过配置获取到指定的比较器对象的class 直接通过反射示例化
      return ReflectionUtils.newInstance(theClass, this);
      }
      // 如果通过配置没获取到指定的比较器对象,接着判断
      // 当前参与比较的对象是否实现了WritableComparable接口
      return WritableComparator.get(getMapOutputKeyClass()
      .asSubclass(WritableComparable.class), this);
      }
      // get()方法就是实现获取比较器对象的逻辑
      public static WritableComparator get(
      Class<? extends WritableComparable> c, Configuration conf) {
      // 根据当前传入的class文件到 comparators的Map中获取比较器对象
      // 这种情况是 当前参与比较的对象的类型是Hadoop自身的数据类型
      WritableComparator comparator = comparators.get(c);
      if (comparator == null) {
      // 考虑到一些极端情况,可能发生GC垃圾回收,导致比较器没了
      // 为了万无一失 再次让类加载一遍
      forceInit(c);
      // 重新加载后再次获取
      comparator = comparators.get(c);
      // 此时还没获取到,那就说明当前参与比较的对象的不是Hadoop自身的数据类型
      if (comparator == null) {
      // Hadoop 会给当前参与比较的对象生成比较器对象
      comparator = new WritableComparator(c, conf, true);
      }
      }
      // Newly passed Configuration objects should be used.
      ReflectionUtils.setConf(comparator, conf);
      return comparator;
      }
    • Hadoop自身的数据类型是如何拥有比较器对象

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      // 以Text为例
      // 当前Text实现了WritableComparable接口
      // 在该类中 定义了自己的比较器对象
      // 该类中还包含一个静态代码快
      static {
      // register this comparator
      WritableComparator.define(Text.class, new Comparator());
      }

      // Text类它的比较器对象被管理到一个Map中,以当前类的class文件为key
      // 当前类的比价器对象为value
      public static void define(Class c, WritableComparator comparator) {
      comparators.put(c, comparator);
      }
  • Combiner

    • 是Mapper和Reducer之外的组件

      • Combiner在每个MapTask节点运行
      • Reducer接收所有Mapper输出结果
    • 提升MR运行效率,减轻ReduceTask压力,减少IO开销

    • 若没有Reduce阶段则没有Shuffle阶段也就没有Combiner阶段

    • 运行的前提是不能影响最终业务逻辑

    • 步骤

      1. 若使复用Reducer不会影响业务逻辑则直接指定Combiner类为Reducer类
      2. 自定一个Combiner类 继承Hadoop提供的Reducer
      3. 在job中指定自定义的Combiner类

OutputFormat

  • 负责最终数据的写出

  • 体系结构

    • FileOutputFormat
      • OutputFormat的实现类,对checkOutputSpecs()做了具体的实现
    • TextOutputFormat:按行写,默认
      • FileOutputFormat的子类,对getRecordWriter()做了具体实现
  • 使用场景

    • 对MR最终的结果有个性化制定的需求,就可以通过自定义OutputFormat来实现
  • 自定义步骤

    1. 自定义类继承Hadoop提供的OutputFormat,在该类中实现getRecordWriter(),返回RecordWriter
    2. 自定义一个 RecordWriter 并且继承Hadoop提供的RecordWriter类,在该类中重写 write()close()在这些方法中完成自定义输出

工作流程

  • MapTask
    1. 前置
      1. MergeClient对数据进行切片划分后提交给Yarn
      2. Yarn启动MrAppMaster后开启对应的MapTask
    2. Read:由InputFormat调用RecorderReader读取数据
      • 默认按行输入
    3. Map:自定义Map业务逻辑
    4. Collect:环形缓冲区的分区和排序
      • 默认按照Key的Hash值%Reducetask个数
      • MapTask执行完毕后使用getPatition()方法标记数据是哪个分区
      • 进入环形缓冲区:默认100M,左侧存索引右侧存数据
      • 到达容量的80%后对Key的索引按照字典顺序进行快排
    5. 溢写:从环形缓冲区反向溢写
      • 得到spill.index和spill.out文件
      • Combiner为可选流程,提前聚合,解决数据倾斜
    6. Merge:对溢写文件进行归并排序
  • ReduceTask
    1. Copy:拉取自己指定分区的数据
    2. Sort:对拉取的数据归并排序
    3. Reduce:自定义Reduce业务逻辑后由OutputFormat输出数据
      • 默认按行输出

并行度决定机制

  • MapTask
    • 由切片数决定
    • 一个切片产生一个MapTask,片多大一个MapTask的处理的数据就多大
    • 切片时只考虑文件本身,不考虑数据的整体集
    • 切片大小和切块大小默认一致,避免跨机器读取数据
      • 数据块:HDFS存储数据单位,物理
      • 数据切片:MapReduce计算输入数据的单位
    • 每个文件单独切片
    • 剩余文件大小 > 切片大小的1.1倍才继续切片
  • ReduceTask
    • 由业务逻辑和集群性能而定
    • ReduceTask=0代表没有Reduce阶段,输出文件个数和Map个数一致
    • 默认值为1,输出文件1个
    • 数据分布不均匀可能导致Reduce阶段数据倾斜

Join

  • ReduceJoin

    • MR程序计算数据时出现多个输入文件之间存在关联,需要在计算中通过两个文件之间相互关联才能获取最终的计算结果

    • 思想

      • 分析文件之间的关系,然后定位关联字段
      • 在Map阶段对多个文件进行数据合并,并且让关联字段作为输出数据的key
      • 当一组相同key的values进入Reduce阶段的reduce方法中第一步:先把两个文件数据分离出来,分别放到各自的对象中维护
      • 把当前一组维护好的数据进行关联操作,得到想要的数据结果
  • MapJoin

    • 考虑MR整体的执行效率,业务场景是一个大文件和一个小文件关联操作,可以使用MapJoin实现
    • 是解决ReduceJoin数据倾斜问题有效的办法
    • 思想
      • 分析文件之间的关系,然后定位关联字段
      • 将小文件的数据映射到内存中的一个容器维护起来
      • 当MapTask处理大文件的数据时,每读取一行数据,就根据当前行中的关联字段到内存的容器里获取对象的信息
      • 封装结果将其输出
  • Job提交流程源码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    // 定位 job.waitForCompletion(true); 
    // 跟到 waitForCompletion() 中
    public boolean waitForCompletion(boolean verbose
    ) throws IOException, InterruptedException,
    // 判断当前Job的状态是否为定义阶段 ClassNotFoundException {
    if (state == JobState.DEFINE) {
    // 提交方法
    submit();
    }
    if (verbose) {
    monitorAndPrintJob();
    } else {
    // get the completion poll interval from the client.
    int completionPollIntervalMillis =
    Job.getCompletionPollInterval(cluster.getConf());
    while (!isComplete()) {
    try {
    Thread.sleep(completionPollIntervalMillis);
    } catch (InterruptedException ie) {
    }
    }
    }
    return isSuccessful();
    }
    // 进入submit() 方法
    public void submit()
    throws IOException, InterruptedException, ClassNotFoundException {
    // 确认当前Job的状态
    ensureState(JobState.DEFINE);
    // 新老API的兼容
    setUseNewAPI();
    // 连接集群(如果是本地模式结果就是LocalRunner, 如果Yarn集群结果就是YARNRuuner)
    connect();
    // 开始提交Job
    final JobSubmitter submitter =
    getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
    public JobStatus run() throws IOException, InterruptedException,
    ClassNotFoundException {
    // 提交Job
    return submitter.submitJobInternal(Job.this, cluster);
    }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
    }
    // 进入 submitJobInternal()
    /*
    * Internal method for submitting jobs to the system.
    *
    * <p>The job submission process involves:
    * <ol>
    * <li>
    检测输入输出路径的合法性
    * Checking the input and output specifications of the job.
    * </li>
    * <li>
    给当前Job计算切片信息
    * Computing the {@link InputSplit}s for the job.
    * </li>
    * <li>
    添加分布式缓存文件
    * Setup the requisite accounting information for the
    * {@link DistributedCache} of the job, if necessary.
    * </li>
    * <li>
    将必要的内容都拷贝到 job执行的临时目录(jar包、切片信息、配置文件)
    * Copying the job's jar and configuration to the map-reduce system
    * directory on the distributed file-system.
    * </li>
    * <li>
    // 提交Job
    * Submitting the job to the <code>JobTracker</code> and optionally
    * monitoring it's status.
    * </li>
    */

ETL

  • 概述
    • 数据清洗:抽取,转换,加载
    • 清理过程只需要运行Mapper程序

压缩

  • 特点

    • 减少磁盘IO,存储空间
    • 增加CPU开销
  • 原则

    • 运算密集型的Job少用压缩
    • IO密集型的Job多用压缩
  • 常用格式

    格式 算法 扩展名 其他
    DEFLATE DEFLATE .deflate
    Gzip DEFLATE .gz
    bzip2 bzip2 .bz2 可切片
    LZO LZO .lzo 可切片/需安装/需建索引,指定输入格式
    Snappy Snappy .snappy
  • 常用选择

    • Map输入端
      • Hadoop自动检查文件扩展名,若能匹配则会用恰当编码方式进行压缩/解压
      • 数据量小于块大小:Snappy/LZO
      • 数据量非常大:LZO/Bzip2
    • Map输出端
      • 网络传输缓慢应考虑压缩技术
      • Snappy/LZO
    • Reduce输出端
      • 永久保存:Bzip2/Gzip
      • 作为下一个MapReduce输入:参考Map输入端
  • LZO配置

    • core-site.xml

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      <configuration>
      <property>
      <name>io.compression.codecs</name>
      <value>
      org.apache.hadoop.io.compress.GzipCodec,
      org.apache.hadoop.io.compress.DefaultCodec,
      org.apache.hadoop.io.compress.BZip2Codec,
      org.apache.hadoop.io.compress.SnappyCodec,
      com.hadoop.compression.lzo.LzoCodec,
      com.hadoop.compression.lzo.LzopCodec
      </value>
      </property>

      <property>
      <name>io.compression.codec.lzo.class</name>
      <value>com.hadoop.compression.lzo.LzoCodec</value>
      </property>
      </configuration>
    • 测试LZO压缩

      1
      2
      3
      4
      hadoop jar /hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.1.jar wordcount -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.compress.codec=com.hadoop.compression.lzo.LzopCodec /input /lzo-output


      hadoop jar /hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.1.jar wordcount /input /lzo-output

Yarn

概述

  • 概述

    • 资源调度平台,为程序提供运算资源
    • Yarn相当于分布式操作系统,MapReduce相当于操作系统上的应用程序
  • 基础结构

    • ResourceManager:管理一个集群
      • 处理客户端请求
      • 监控NodeManager
      • 启动或监控ApplicationMaster
      • 资源调分配调度
    • NodeManager:管理一个节点
      • 管理单节点的资源
      • 处理来自ResourceManager的命令
      • 处理来自ApplicationMaster的命令
    • ApplicationMaster:管理一个作业
      • 为应用程序申请资源并分配给内部的任务
      • 任务的监控与容错
    • Container:相应的容器
      • 封装节点上的多种资源(CPU、内存、磁盘、网络)

工作机制(面试)

  1. YarnRunnerRM申请一个Application
  2. YarnRunnerJob运行所需资源(切片,XML,Jar包)提交到集群路径上
  3. YarnRunner提交完毕后申请运行MRAppMaster
  4. RM将请求初始化成一个Task,放入调度器
  5. NM领取Task任务,创建Container,启动MRAppMaster
  6. MRAppMaster从集群下载Job资源到本地,向RM申请MapTask容器
  7. RMTask放入调度器
  8. NM领取任务,创建容器,拷贝Job资源(NM可能不是同一个)
  9. MRAppMasterNM发送程序启动命令
  10. MapTask程序执行完毕,MRAppMasterRM申请Container运行ReduceTask程序
  11. (过程如8-9)
  12. ReduceTask程序执行完毕后,MRAppMasterRM注销自己

调度器

IFIO调度器

  • 单队列:按先后顺序
  • 生产环境不会使用

容量调度器

  • Apache默认调度器
  • 多队列:每个队列可配置一定的资源量,采用先进先出
  • 容量保证:可为每个队列设置资源上下限
  • 灵活性:若一个队列有剩余资源可以暂时共享给需要资源的队列
  • 若该队列有新程序提交,其他队列会归还借调的资源
  • 多租户:多用户调度器
    • 会限定同一用户提交作业所占资源量
  • 资源分配方式可选用FIFO、DRF

公平调度器

  • CDH框架默认调度器
  • 同队列所有任务共享资源,在时间尺度上获得公平的资源
  • 多队列
  • 容量保证
  • 灵活性
  • 多用户
  • 资源分配方式可选用FIFO、FAIR、DRF

算法

  • FIFO
    1. 优先选择资源占用最低的队列分配资源(DFS)
    2. 按照作业的优先级和提交顺序分配资源
    3. 按照容器的优先级分配资源,优先级相同按照数据本地性原则
      1. 同节点 > 同机架 > 其他
  • FAIR
    1. 优先选择对资源的缺额比例大的队列分配资源
      • 缺额:某个作业应获取资源和实际获取资源的差距
    2. 每一步优先按公平策略分配资源,再与容量调度器一致
      1. 先平均分配,把多出的资源再平均分配给缺的(若有权重则除以权重和后,按权分配)
  • DRF
    1. 对不同资源需求(内存,CPU)进行不同分配

生产环境

常用命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 查看任务
yarn application -list
# 根据状态过滤
yarn application -list -appStates [ALL/NEW/NEW_SAVING/SUBMITTED/ACCEPTED/RUNNING/FINISHED/FAILED/KILLED]
# kill掉Application
yarn application -kill 程序ID
# 查看Application日志
yarn logs -applicationId 程序ID
# 查看Container日志
yarn logs -applicationId 程序ID -containerId 容器ID
# 查看尝试运行的任务
yarn applicationAttempt -list 程序ID
# 打印尝试运行的任务的状态
yarn applicationAttempt -status 程序ID
# 查看容器(运行时查看)
yarn container -list 容器ID
# 打印容器状态(运行时查看)
yarn container -status 容器ID
# 查看所有节点状态
yarn node -list -all
# Yarn RMAdmin更新配置(刷新队列参数)
yarn rmAdmin -refreshQueues
# Yarn队列查看
yarn queue -status 队列名

核心参数

  • ReourceManager

    • 调度器,默认容量

      yarn.reourcemanager.scheduler.class

    • RM处理调度请求的线程数量,默认50

      yarn.reourcemanager.scheduler.client.thread-count

  • NodeManager

    • 让yarn检测硬件进行配置,默认false

      yarn.nodemanager.resource.detect-hardware-capabilities

    • 将虚拟核数当作CPU核数,默认false

      yarn.nodemanager.resource.count-logical-processors-as-cores

    • 虚拟核数和物理核数乘数,默认1.0(例如: 4核8线程,该参数就应设为2)

      yarn.nodemanager.resource.pcores-vcores-multiplier

    • NodeManager使用内存,默认8G

      yarn.nodemanager.resource.memory-mb

    • 为系统保留内存大小(以上二个参数配置一个即可)

      yarn.nodemanager.resource.system-reserved-memory-mb

    • NodeManager使用CPU核数,默认8个

      yarn.nodemanager.resource.cpu-vcores

    • 开启物理内存检查限制container,默认true

      yarn.nodemanager.pmem-check-enabled

    • 开启虚拟内存检查限制container,默认true

      yarn.nodemanager.vmem-check-enabled

    • 虚拟内存物理内存比例,默认2.1

      yarn.nodemanager.vmem-pmem-ratio

  • Container

    • 容器最最小内存,默认1G

      yarn.scheduler.minimum-allocation-mb

    • 容器最最大内存,默认8G

      yarn.scheduler.maximum-allocation-mb

    • 容器最小CPU核数,默认1个

      yarn.scheduler.minimum-allocation-vcores

    • 容器最大CPU核数,默认4个

      yarn.scheduler.maximum-allocation-vcores

容量调度器

  • 多队列

    • 按照业务模块的创建队列

    • 可实现任务的降级使用(按照任务重要程度调整运行重点)

    • 按需修改capacity-scheduler.xml,修改完成后使用yarn rmadmin -refreshQueue更新队列配置

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      <!-- 指定多队列,增加hive队列 -->
      <property>
      <name>yarn.scheduler.capacity.root.queues</name>
      <value>default,hive</value>
      <description>
      The queues at the this level (root is the root queue).
      </description>
      </property>

      <!-- 降低default队列资源额定容量为40%,默认100% -->
      <property>
      <name>yarn.scheduler.capacity.root.default.capacity</name>
      <value>40</value>
      </property>

      <!-- 降低default队列资源最大容量为60%,默认100% -->
      <property>
      <name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
      <value>60</value>
      </property>

      <!-- 指定hive队列的资源额定容量 -->
      <property>
      <name>yarn.scheduler.capacity.root.hive.capacity</name>
      <value>60</value>
      </property>

      <!-- 提交的用户能占总资源的多少(0.0-1.0) -->
      <property>
      <name>yarn.scheduler.capacity.root.hive.user-limit-factor</name>
      <value>1</value>
      </property>

      <!-- 指定hive队列的资源最大容量 -->
      <property>
      <name>yarn.scheduler.capacity.root.hive.maximum-capacity</name>
      <value></value>
      </property>

      <!-- 设置队列运行状态 -->
      <property>
      <name>yarn.scheduler.capacity.root.hive.state</name>
      <value>RUNNING</value>
      </property>

      <!-- 设置可向该队列提交任务的用户 -->
      <property>
      <name>yarn.scheduler.capacity.root.hive.acl_submit_applications</name>
      <value>*</value>
      </property>

      <!-- 设置可操作该队列的用户 -->
      <property>
      <name>yarn.scheduler.capacity.root.hive.acl_administer_queue</name>
      <value>*</value>
      </property>

      <!-- 哪些用户可以设置该队列优先级 -->
      <property>
      <name>yarn.scheduler.capacity.root.hive.acl_application_max_priority</name>
      <value>*</value>
      </property>

      <!-- 任务最大的生存周期 -->
      <property>
      <name>yarn.scheduler.capacity.root.hive.maximum-application-lifetime</name>
      <value>-1</value>
      </property>

      <!-- 任务默认的生存周期 -->
      <property>
      <name>yarn.scheduler.capacity.root.hive.default-application-lifetime</name>
      <value>-1</value>
      </property>
  • 任务优先级

    • Yarn将所有任务优先级限制为0,需开放限制

    • yarn-site.xml

      1
      2
      3
      4
      <property>
      <name>yarn.cluster.max-application-priority</name>
      <value>5</value>
      </property>

公平调度器

  • yarn-site.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    <property>
    <name>yarn.resourcemanaqer.scheduler.class</ name>
    <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
    <description>配置使用公平调度器</description>
    </property>
    <property>
    <name>yarn.scheduler.fair.allocation.file</name>
    <value>/opt/module/hadoop-3.1.3/etc/hadoop/fair-scheduler.xml</value>
    <description>指名公平调度器队列分配配置文件</description>
    </property>
    <property>
    <name>yarn.scheduler.fair.preemption<name>
    <value>false</value>
    <description>禁止队列间资源抢占</description>
    </property>
  • fair-scheduler.xml(可自定义)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    <?xml version="l.0"?>
    <allocations>
    <!--单个队列中Application Master占用资源的最大比例,取值0-1,企业一般配置0.1-->
    <queueMaxAMShareDefault>0.5</queueMaxAMShareDefault>
    <!--单个队列最大资源的默认值test atguigu default -->
    <queueMaxResourcesDefault>4096mb,4vcores</queueMaxResourcesDefault>
    <!--增加一个队列test -->
    <queue name="test">
    <!--队列最小资源-->
    <minResources>2048mb,2vcores</minResources>
    <!--队列最大资源-->
    <maxResources>409 6mb,4vcores</maxResources>
    <!--队列中最多同时运行的应用数,默认50,根据线程数配置-->
    <maxRunningApps>4</maxRunningApps>
    <!--队列中Application Master占用资源的最大比例-->
    <maxAMShare>0.5</maxAMShare>
    <!--该队列资源权重,默认值为1.0 -->
    <weight>1.0</weight>
    <!-- 队列内部的资源分配策略 --> <schedulingPolicy>fair</schedulingPolicy>
    </queue>
    <!-- 增加一个队列 atguigu -->
    <queue name="atguigu" type="parent">
    <!-- 队列最小资源 -->
    <minResources>2048mb,2vcores</minResources>
    <!-- 队列最大资源 -->
    <maxResources>4096mb,4vcores</maxResources>
    <!-- 队列中最多同时运行的应用数,默认 50,根据线程数配置 --> <maxRunningApps>4</maxRunningApps>
    <!-- 队列中 Application Master 占用资源的最大比例 --> <maxAMShare>0.5</maxAMShare>
    <!-- 该队列资源权重,默认值为 1.0 -->
    <weight>1.0</weight>
    <!-- 队列内部的资源分配策略 --> <schedulingPolicy>fair</schedulingPolicy>
    </queue>
    <!-- 任务队列分配策略,可配置多层规则,从第一个规则开始匹配,直到匹配成功 --> <queuePlacementPolicy>
    <!-- 提交任务时指定队列,如未指定提交队列,则继续匹配下一个规则; false 表示:如果指 定队列不存在,不允许自动创建-->
    <rule name="specified" create="false"/>
    <!-- 提交到 root.group.username 队列,若 root.group 不存在,不允许自动创建;若 root.group.user 不存在,允许自动创建 -->
    <rule name="nestedUserQueue" create="true">
    <rule name="primaryGroup" create="false"/>
    </rule>
    <!-- 最后一个规则必须为 reject 或者 default。Reject 表示拒绝创建提交失败, default 表示把任务提交到 default 队列 -->
    <rule name="reject" />
    </queuePlacementPolicy>
    </allocations>

Tool接口

概述

  • 用来动态修改参数

生产调优

HDFS

核心参数

  • 内存配置hadoop-env.sh

    1
    2
    3
    4
    5
    6
    7
    8
    9
    # 配置Hadoop内存
    # the JVM will autoscale based upon machine memory size.
    export HADOOP_HEAPSIZE_MAX=
    export HADOOP_HEAPSIZE_MIN=

    # 配置NameNode内存
    export HDFS_NAMENODE_OPTS="-Dhadoop.security.logger=INFO,RFAS - Xmx1024m"
    # 配置DataNode内存
    export HDFS_DATANODE_OPTS="-Dhadoop.security.logger=ERROR,RFAS -Xmx1024m"
    • 查看占用内存

      1
      2
      jmap -heap 进程号
      # MaxHeapSize = 1031798784 (984.0MB)
    • NN/DN配置建议

      • NN最小值1G,每增加1百万个block增加1G内存
      • DN最小值4G,副本总数低于4百万设置为4G,否则每增加一百万个block增加1G内存
  • 心跳并发配置hdfs-site.xml

    1
    2
    3
    4
    <property>
    <name>dfs.namenode.handler.count</name>
    <value>21</value>
    </property>
    • NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发 的元数据操作,默认值:10

    • 大集群或有大量客户端的集群,需要增大参数

    • 企业经验
      $$
      dfs.namenode.handler.count=20 × 𝑙𝑜𝑔_e集群数量
      $$

  • 回收站配置core-site.xml

    1
    2
    3
    4
    5
    <!--配置垃圾回收时间为 1 分钟-->
    <property>
    <name>fs.trash.interval</name>
    <value>1</value>
    </property>
    • 网页上直接删除的文件不会走回收站

    • 通过程序删除的文件不会经过回收站,需要调用moveToTrash()才进入回收站

    • 在命令行利用hadoop fs -rm命令删除的文件才会走回收站

    • 参数

      • 默认值fs.trash.interval = 0,表示禁用回收站;其他值表示设置文件的存活时间
      • 默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间
        • 值为0:和fs.trash.interval的参数值相等
      • 要求 fs.trash.checkpoint.interval <= fs.trash.interval
    • 路径:/user/用户/.Trash/…]

    • 恢复数据

      1
      hadoop fs -mv /user/atguigu/.Trash/Current/user/atguigu/input /user/atguigu/input

集群压测

  • 写性能

    • 向HDFS集群写10个128M的文件

      1
      hadoop jar /opt/module/hadoop- 3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -write -nrFiles 10 - fileSize 128MB
    • nrFilesn为生成mapTask的数量,生产环境一般可通过hadoop103:8088查看CPU 核数,设置为(CPU 核数 - 1)

    • Numberoffiles:生成mapTask数量,一般是集群中(CPU核数-1),虚拟机就按照实际的物理内存-1分配即可

    • TotalMBytesprocessed:单个map处理的文件大小

    • hroughputmb/sec:单个mapTak的吞吐量

      • 计算方式:处理的总文件大小/每一个 mapTask 写数据的时间累加
      • 集群整体吞吐量:生成 mapTask 数量*单个 mapTak 的吞吐量
    • Average IO rate mb/sec:平均 mapTak 的吞吐量

      • 计算方式:每个 mapTask 处理文件大小/每一个 mapTask 写数据的时间全部相加除以 task 数量
    • IO rate std deviation:方差、反映各个 mapTask 处理的差值,越小越均衡

    • 若测试出现异常可以在 yarn-site.xml 中设置虚拟内存检测为false,并重启

      1
      2
      3
      4
      5
      <!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true-->
      <property>
      <name>yarn.nodemanager.vmem-check-enabled</name>
      <value>false</value>
      </property>
    • 若实测速度远远小于网络,且不能满足工作需求,可以考虑采用固态硬盘或增加磁盘个数

      • 由于副本 1 就在本地,所以该副本不参与测试(10 个文件 * 2 个副本 = 20 个)
      • 如果客户端不在集群节点,那就三个副本都参与计算
  • 读性能

    • 读取HDFS集群10个128M的文件

      1
      hadoop jar /opt/module/hadoop- 3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -write -nrFiles 10 - fileSize 128MB
    • 删除测试生成数据

      1
      hadoop jar /opt/module/hadoop- 3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client- jobclient-3.1.3-tests.jar TestDFSIO -clean

多目录

  • NameNode多目录

    • NameNode的本地目录可以配置成多个,且每个目录存放内容相同,增加了可靠性

    • hdfs-site.xml

      1
      2
      3
      4
      <property>
      <name>dfs.namenode.name.dir</name>
      <value>file://${hadoop.tmp.dir}/dfs/name1,file://${hadoop.tmp. dir}/dfs/name2</value>
      </property>
    • 修改完需停止集群,删除所有节点的data和logs中所有数据后,格式化集群并启动

  • DataNode多目录

    • DataNode可以配置成多个目录,每个目录存储的数据不一样(数据不是副本)

    • hdfs-site.xml

      1
      2
      3
      4
      <property>
      <name>dfs.datanode.data.dir</name>
      <value>file://${hadoop.tmp.dir}/dfs/data1,file://${hadoop.tmp. dir}/dfs/data2</value>
      </property>
  • 磁盘间数据均衡

    • 刚加载的新硬盘没数据时可以执行磁盘数据均衡命令

      1
      2
      3
      4
      5
      6
      7
      8
      # 生成均衡计划(我们只有一块磁盘,不会生成计划) 
      hdfs diskbalancer -plan hadoop103
      # 执行均衡计划
      hdfs diskbalancer -execute hadoop103.plan.json
      # 查看当前均衡任务的执行情况
      hdfs diskbalancer -query hadoop103
      # 取消均衡任务
      hdfs diskbalancer -cancel hadoop103.plan.json

扩缩容

  • 添加白名单

    • 在白名单的主机 IP 地址可以用来存储数据,可以尽量防止黑客恶意访问攻击

    • 在 NameNode 节点的/opt/module/hadoop-3.1.3/etc/hadoop 目录下分别创建 whitelist 和 blacklist 文件

      1
      2
      hadoop102
      hadoop103
    • hdfs-site.xml 配置文件中增加 dfs.hosts 配置参数

      1
      2
      3
      4
      5
      6
      7
      8
      <!-- 白名单 -->
      <property>
      <name>dfs.hosts</name>
      <value>/opt/module/hadoop-3.1.3/etc/hadoop/whitelist</value> </property>
      <!-- 黑名单 -->
      <property>
      <name>dfs.hosts.exclude</name>
      <value>/opt/module/hadoop-3.1.3/etc/hadoop/blacklist</value> </property>
    • 分发配置文件 whitelisthdfs-site.xml

    • 第一次添加白名单必须重启集群,不是第一次,只需要刷新 NameNode 节点即可

  • 服役新服务器

    • 拷贝所有配置文件后直接启动 DataNode,即可关联到集群

    • 在白名单中增加新服役的服务器,分发并刷新NameNode

      1
      hdfs dfsadmin -refreshNodes
  • 服务器间数据均衡

    • HDFS需要启动单独的Rebalance Server执行Rebalance操作,尽量找一台较空闲的机器,而不是在NameNode上执行start-balancer.sh
    1
    2
    3
    4
    5
    # 开启数据均衡命令
    # 参数10代表集群中各节点磁盘空间利用率相差不超过10%,可根据实际情况进行调整
    sbin/start-balancer.sh -threshold 10
    # 停止数据均衡命令
    sbin/stop-balancer.sh
  • 黑名单退役服务器

    • 黑名单的主机IP地址不可以用来存储数据,企业中配置黑名单用来退役服务器

    • 编辑**/opt/module/hadoop-3.1.3/etc/hadoop** 目录下的 blacklist 文件,添加如下主机名称

    • 如果白名单中没有配置,需要在 hdfs-site.xml 配置文件中增加 dfs.hosts 配置参数

      1
      2
      3
      4
      <!-- 黑名单 -->
      <property>
      <name>dfs.hosts.exclude</name>
      <value>/opt/module/hadoop-3.1.3/etc/hadoop/blacklist</value> </property>
    • 分发配置文件:blacklist,hdfs-site.xml

    • 第一次添加黑名单必须重启集群,否则只需要刷新 NameNode 节点即可

      1
      hdfs dfsadmin -refreshNodes
    • 等待退役节点状态为 decommissioned(所有块已经复制完成),停止该节点及节点资源 管理器

      • 如果副本数是 3,服役的节点小于等于 3,是不能退役成功的,需要修改副本数后才能退役
    • 如果数据不均衡,可以用命令实现集群的再平衡

      1
      sbin/start-balancer.sh -threshold 10

存储优化

  • 纠删码

    • HDFS 默认情况下,一个文件有 3 个副本,这样提高了数据的可靠性,但也带来了 2 倍的冗余开销。3.x 引入纠删码采用计算的方式可以节省约 50%左右的存储空间
    • 一个文件拆分成多个数据单元 + 多个校验单元,存储上只比该文件大小多了多个校验单元的大小
    • 用CPU计算资源换取存储资源
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    hdfs ec
    Usage: bin/hdfs ec [COMMAND]
    [-listPolicies]
    [-addPolicies -policyFile <file>]
    [-getPolicy -path <path>]
    [-removePolicy -policy <policy>]
    [-setPolicy -path <path> [-policy <policy>] [-replicate]]
    [-unsetPolicy -path <path>]
    [-listCodecs]
    [-enablePolicy -policy <policy>]
    [-disablePolicy -policy <policy>]
    [-help <command-name>].
    • 纠删码策略是给具体一个路径设置。所有往此路径下存储的文件,都会执行此策略。默认只开启对 RS-6-3-1024k 策略的支持,如要使用别的策略需要提前启用

      • RS-6-3-1024k:使用R 编码,每6个数据单元,生成3个校验单元,共9个单元,这9个单元中只要有任意的6个单元存在(不管是数据单元还是校验单元)就可以得到原始数据
    • 案例:将/input 目录设置为 RS-3-2-1024k 策略

      1. 开启对 RS-3-2-1024k 策略的支持

        1
        hdfs ec -enablePolicy -policy RS-3-2-1024k
      2. 在 HDFS 创建目录,并设置 RS-3-2-1024k 策略

        1
        hdfs ec -setPolicy -path /input -policy RS-3-2-1024k
      • 上传的文件需要大于 2M 才能看出效果。(低于 2M,只有一个数据单元和两 个校验单元)
  • 异构存储(冷热数据分离)

    • 不同的数据存储在不同类型的硬盘中以达到最佳性能

    • 存储类型

      • RAM_DISK:内存镜像文件系统
      • SSD:SSD固态硬盘
      • DISK:普通磁盘,在HDFS中,如果没有主动声明数据目录存储类型默认都是DISK
      • ARCHIVE:没有特指哪种存储介质,主要的指的是计算能力比较弱而存储密度比较高的存储介质,用来解决数据量的 容量扩增的问题,一般用于归档
    • 存储策略(访问速度从快到慢)

      策略ID 策略名称 副本分布
      15 Lazy_Persist RAM_DISK:1,DISK:n-1
      12 All_SSD SSD:n
      10 One_SSD SSD:1,DISK:n-1
      7 Hot(default) DISK:n
      5 Warm DSIK:1,ARCHIVE:n-1
      2 Cold ARCHIVE:n
      • 当目录设置为 COLD 且未配置 ARCHIVE 存储目录的情况下,不可以向该目录直接上传文件,会报出异常

      • LAZY_PERSIST时,文件块副本都存储在DISK上的原因

      • 当客户端所在的 DataNode 节点没有 RAM_DISK 时,则会写入客户端所在的 DataNode 节点的 DISK 磁盘,其余副本会写入其他节点的 DISK 磁盘

      • 当客户端所在的 DataNode 有 RAM_DISK,但dfs.datanode.max.locked.memory参数值未设置或者设置过小(小于dfs.block.size参数值)时,则会写入客户端所在的 DataNode 节点的 DISK 磁盘,其余副本会写入其他节点的 DISK 磁盘

        1
        2
        # 查询max locked memory内存
        ulimit -a
    • Shell操作

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      # 查看当前有哪些存储策略可以用
      hdfs storagepolicies -listPolicies
      # 为指定路径(数据存储目录)设置指定的存储策略
      hdfs storagepolicies -setStoragePolicy -path xxx -policy xxx
      # 获取指定路径(数据存储目录或文件)的存储策略
      hdfs storagepolicies -getStoragePolicy -path xxx
      # 取消存储策略
      # 执行改命令后该目录或者文件以其上级目录为准,若是根目录,就是HOT
      hdfs storagepolicies -unsetStoragePolicy -path xxx
      # 查看文件块的分布
      bin/hdfs fsck xxx -files -blocks -locations
      # 查看集群节点
      hadoop dfsadmin -report
    • 配置hdfs-site.xml

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      <property>
      <name>dfs.storage.policy.enabled</name>
      <value>true</value>
      </property>
      <property>
      <name>dfs.datanode.data.dir</name>
      <!--路径前手动设置标识[存储介质],服务器会认为该路径是该存储介质-->
      <value>[SSD]file:///opt/module/hadoop-
      3.1.3/hdfsdata/ssd,[RAM_DISK]file:///opt/module/hadoop-
      3.1.3/hdfsdata/ram_disk</value>
      </property>

故障排除

  • NameNode故障处理

    • 需求:NameNode进程挂了并且存储的数据丢

    • 解决

      • 拷贝 SecondaryNameNode 中数据到原 NameNode 存储数据目录

        1
        scp -r atguigu@hadoop104:/opt/module/hadoop- 3.1.3/data/dfs/namesecondary/* ./name/
      • 重新启动 NameNode

        1
        hdfs --daemon start namenode
  • 集群安全模式&磁盘修复

    • 安全模式:文件系统只接受读请求,不接受删除、修改等变更请求

    • 进入安全模式场景

      • NameNode在加载镜像文件和编辑日志期间处于安全模式
      • NameNode再接收DataNode注册时,处于安全模式
    • 退出安全模式条件

      • dfs.namenode.safemode.min.datanodes:最小可用 datanode 数量,默认0(大于0)
      • dfs.namenode.safemode.threshold-pct:副本数达到最小要求的block占系统总block数的百分比,默认0.999f(只允许丢一个块)
      • dfs.namenode.safemode.extension:稳定时间,默认30000毫秒,即30秒
    • 基本语法

      1
      2
      3
      4
      5
      6
      7
      8
      # 查看安全模式状态
      bin/hdfsdfsadmin-safemode get
      # 进入安全模式状态
      bin/hdfs dfsadmin -safemode enter
      # 离开安全模式状态
      bin/hdfs dfsadmin -safemode leave
      # 等待安全模式状态,退出安全模式后执行下面的操作
      bin/hdfs dfsadmin -safemode wait
    • 需求:数据块损坏,进入安全模式

      1. 离开安全模式

        1
        2
        hdfs dfsadmin -safemode get
        hdfs dfsadmin -safemode leave
      2. 元数据删除

  • 慢磁盘监控

    • 一般出现慢磁盘现象,会影响到 DataNode 与 NameNode 之间的心跳。正常情况心跳时 间间隔是 3s。超过 3s 说明有异常

    • fio 命令,测试磁盘的读写性能

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      sudo yum install -y fio
      # 顺序读测试
      sudo fio -filename=/home/atguigu/test.log -direct=1 -iodepth 1 -thread - rw=read -ioengine=psync -bs=16k -size=2G -numjobs=10 -runtime=60 -group_reporting -name=test_r
      # 顺序写测试
      sudo fio -filename=/home/atguigu/test.log -direct=1 -iodepth 1 -thread -
      rw=write -ioengine=psync -bs=16k -size=2G -numjobs=10 -runtime=60 -group_reporting -name=test_w
      # 随机写测试
      sudo fio -filename=/home/atguigu/test.log -direct=1 -iodepth 1 -thread -rw=randwrite -ioengine=psync -bs=16k -size=2G -numjobs=10 -runtime=60 -group_reporting -name=test_randw
      # 混合随机读写
      sudo fio -filename=/home/atguigu/test.log -direct=1 -iodepth 1 -thread -rw=randrw -rwmixread=70 -ioengine=psync -bs=16k -size=2G -numjobs=10 -runtime=60 -group_reporting -name=test_r_w -ioscheduler=noop
  • 小文件归档

    • 问题

      • 一个1MB的文件设置为128MB的块存储,实际使用的是1MB的磁盘空间,而不是 128MB
      • 100个1k文件块和100个128m文件块占用NN内存大小一样
    • 解决

      • HDFS存档文件或HAR件,是一个更高效的文件存档工具,它将文件存入HDFS块
      • 在减少NameNode内存使用,允许对文件进行透明的访问
      • HDFS存档文件对内还是一个一个独立文件,对NameNode而言却是一个整体
    • 需要启动 YARN 进程

      1
      start-yarn.sh
    • 案例:把/input目录里面的所有文件归档成一个叫input.har的归档文件,并把归档后文件存储 到/output路径下

      1
      hadoop archive -archiveName input.har -p /input /output
    • 查看归档

      1
      2
      3
      hadoop fs -ls /output/input.har
      # 查看具体子文件
      hadoop fs -ls har:///output/input.har
    • 解归档文件

      1
      hadoop fs -cp har:///output/input.har/* /

集群迁移

  • scp 实现两个远程主机之间的文件复制

    1
    2
    3
    4
    5
    6
    # 推push
    scp -r hello.txt root@hadoop103:/user/atguigu/hello.txt
    # 拉pull
    scp -r root@hadoop103:/user/atguigu/hello.txt hello.txt
    # 是通过本地主机中转实现两个远程主机的文件复制;如果在两个远程主机之间 ssh 没有配置的情况下可以使用该方式
    scp -r root@hadoop103:/user/atguigu/hello.txt root@hadoop104:/user/atguigu
  • 采用 distcp 命令实现两个 Hadoop 集群之间的递归数据复制

    1
    2
    # 1为源 2为目标
    bin/hadoop distcp hdfs://hadoop102:8020/user/atguigu/hello.txt hdfs://hadoop105:8020/user/atguigu/hello.txt

MapReduce

效率瓶颈

  • 计算机性能
    • CPU、内存、磁盘、网络
  • I/O操作优化
    • 数据倾斜
    • Map运行时间太长,导致 Reduce等待过久
    • 小文件过多

常用优化

  • 自定义分区
    • 减少数据倾斜
    • 定义类,继承Partitioner接口,重写getPartition方法
  • 减少溢写的次数
    • mapreduce.task.io.sort.mb
      • Shuffle的环形缓冲区大小
      • 默认100m,可提高到200m
    • mapreduce.map.sort.spill.percent
      • 环形缓冲区溢出的阈值
      • 默认80% ,可提高到90%
  • 增加每次Merge合并次数
    • mapreduce.task.io.sort.factor默认10,可以提高到20
  • 采用Combiner
    • 在不影响业务结果的前提条件下
    • job.setCombinerClass(xxxReducer.class);
  • 采用Snappy或者LZO压缩
    • 减少磁盘IO
    • conf.setBoolean("mapreduce.map.output.compress", true);
    • conf.setClass("mapreduce.map.output.compress.codec",
    • SnappyCodec.class,CompressionCodec.class);
  • 提高MapTask内存上限
    • mapreduce.map.memory.mb
    • 默认1024MB,可根据128m数据对应1G内存原则提高该内存
  • 设置MapTask堆内存大小
    • mapreduce.map.java.opts
    • 与MapTask内存上限值保持一致
    • 若内存不够, 报java.lang.OutOfMemoryError
  • 增加MapTask的CPU核数
    • mapreduce.map.cpu.vcores
    • 默认1,计算密集型任务可增加核数
  • 提高Map Task最大重试次数
    • mapreduce.map.maxattempts
    • 重试次数超过该值则认为Map Task运行失败
    • 默认4,根据机器性能适当提高
  • 提高Reduce拉取Map的并行数
    • mapreduce.reduce.shuffle.parallelcopies
    • 默认5,可以提高到10
  • 提高Buffer大小占Reduce可用内存的比例
    • mapreduce.reduce.shuffle.input.buffer.percent
    • 默认0.7,可提高到0.8
  • 提高Buffer写入磁盘所达到多少比例
    • mapreduce.reduce.shuffle.merge.percent
    • 默认0.66,可提高到0.75
  • 提高ReduceTask内存上限
    • mapreduce.reduce.memory.mb
    • 默认1024MB, 根据128m数据对应1G内存原则适当提高内存到4-6G
  • 调整ReduceTask堆内存大小
    • mapreduce.reduce.java.opts
    • 与ReduceTask内存上限值保持一致
    • 若内存不够, 报java.lang.OutOfMemoryError
  • 提高ReduceTask的CPU核数
    • mapreduce.reduce.cpu.vcores
    • 默认1个,可提高到2-4个
  • 提高Reduce Task最大重试次数
    • mapreduce.reduce.maxattempts
    • 重试次数超过该值则认为Map Task运行失败
    • 默认4
  • 设置MapTask完成的比例达到该值后才会为ReduceTask申请资源
    • mapreduce.job.reduce.slowstart.completedmaps
    • 默认0.05
  • 提高超时阈值
    • mapreduce.task.timeout
    • 若一个Task在一定时间内没有任何进入(即不会读取新的数据,也没有输出数据)则认为该Task处于Block状态(可能是卡住了,也许永远会卡住)为防止因为用户程序永远Block不退出,则强制设置了一个该超时时间(单位毫秒)
    • 默认是600000(10分钟),若程序对每条输入数据的处理时间过长,将该参数调大
  • Reduce能不用就不用

数据倾斜问题

  • 现象
    • 数据频率倾斜:一个区域的数据量要远远大于其他区域
    • 数据大小倾斜:部分记录的大小远远大于平均值
  • 解决
    • 首先检查是否空值过多造成的
      • 生产环境可以直接过滤掉空值
      • 若想保留空值就自定义分区,将空值加随机数打散,再二次聚合
    • map阶段能提前处理就处理
      • Combiner、MapJoin
    • 设置多个reduce个数

Yarn

常用调优参数

  • Resourcemanager相关

    1
    2
    3
    4
    # 处理调度 器请求的线程数量
    yarn.resourcemanager.scheduler.client.thread-count ResourceManager
    # 配置调度器
    yarn.resourcemanager.scheduler.class
  • Nodemanager相关

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    # 使用内存数 
    yarn.nodemanager.resource.memory-mb NodeManager
    # NodeManager为系统保留 多少内存,和上一个参数二者取一即可
    yarn.nodemanager.resource.system-reserved-memory-mb
    # 使用 CPU 核数
    yarn.nodemanager.resource.cpu-vcores NodeManager
    # 是否将虚拟核 数当作 CPU 核数
    yarn.nodemanager.resource.count-logical-processors-as-cores
    # 虚拟核数和物理核数乘数,例 如:4 核 8 线程,该参数就应设为 2
    yarn.nodemanager.resource.pcores-vcores-multiplier
    # 是否让 yarn 自己检测硬 件进行配置
    yarn.nodemanager.resource.detect-hardware-capabilities
    # 是否开启物理内存检查限制 container
    yarn.nodemanager.pmem-check-enabled
    # 是否开启虚拟内存检查限制 container
    yarn.nodemanager.vmem-check-enabled
    # 虚拟内存物理内存比例
    yarn.nodemanager.vmem-pmem-ratio
  • Container容器相关

    1
    2
    3
    4
    5
    6
    7
    8
    # 容器最小内存
    yarn.scheduler.minimum-allocation-mb
    # 容器最大内存
    yarn.scheduler.maximum-allocation-mb
    # 容器最小核数
    yarn.scheduler.minimum-allocation-vcores
    # 容器最大核数
    yarn.scheduler.maximum-allocation-vcores

容量调度器使用

公平调度器使用

综合调优

小文件优化

  • 问题

    • 大量占用NameNode内存空间
    • 元数据文件过多,寻址索引速度变慢
    • MapTask的处理时间比启动时间还小
  • 解决

    1. 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS(数据源头)

    2. Hadoop Archive(存储方向)

      • 将多个小文件打包成一个 HAR 文件,从而达到减少 NameNode 的内存使用
    3. CombineTextInputFormat(计算方向)

      • 将多个小文件在切片过程中生成一个单独的切片或者少 量的切片
    4. 开启uber模式,实现JVM重用(计算方向)

      • 默认情况每个 Task 任务都需要启动一个 JVM 来运行,如果 Task 任务计算的数据

        量很小,可以让同一个 Job 的多个 Task 运行在一个 JVM 中,不必为每个 Task 都开启 一个 JVM

      • 在 mapred-site.xml中开启uber模式

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        <!-- 开启uber模式,默认关闭-->
        <property>
        <name>mapreduce.job.ubertask.enable</name><value>true</value>
        </property>
        <!--uber模式中最大的mapTask数量,可向下修改 -->
        <property>
        <name>mapreduce.job.ubertask.maxmaps</name>
        <value>9</value>
        </property>
        <!-- uber 模式中最大的 reduce 数量,可向下修改 -->
        <property>
        <name>mapreduce.job.ubertask.maxreduces</name>
        <value>1</value>
        </property>
        <!-- uber 模式中最大的输入数据量,默认使用 dfs.blocksize 的值,可向下修 改 -->
        <property>
        <name>mapreduce.job.ubertask.maxbytes</name>
        <value></value>
        </property>
      • 分发配置

测试MapReduce计算性能

  • 使用 Sort 程序评测 MapReduce

    • 一个虚拟机不超过 150G 磁盘尽量不要执行这段代码

    • 使用 RandomWriter 来产生随机数,每个节点运行 10 个 Map 任务,每个 Map 产

      生大约 1G 大小的二进制随机数

      1
      hadoop jar /opt/module/hadoop- 3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples- 3.1.3.jar randomwriter random-data
    • 执行 Sort 程序

      1
      hadoop jar /opt/module/hadoop- 3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples- 3.1.3.jar sort random-data sorted-data
    • 验证数据是否真正排好序了

      1
      hadoop jar /opt/module/hadoop- 3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client- jobclient-3.1.3-tests.jar testmapredsort -sortInput random-data -sortOutput sorted-data

企业开发场景案例

  • 需求

    • 从 1G 数据中,统计每个单词出现次数。服务器 3 台,每台配置 4G 内存, 4 核 CPU,4 线程
    • 1G / 128m = 8 个 MapTask
    • 1 个 ReduceTask;
    • 1 个 mrAppMaster
    • 平均每个节点运行10个 / 3台 ≈ 3个任务(4 3 3)
  • HDFS参数调优

    • hadoop-env.sh

      1
      2
      export HDFS_NAMENODE_OPTS="-Dhadoop.security.logger=INFO,RFAS - Xmx1024m"
      export HDFS_DATANODE_OPTS="-Dhadoop.security.logger=ERROR,RFAS -Xmx1024m"
    • hdfs-site.xml

      1
      2
      3
      4
      5
      <!-- NameNode 有一个工作线程池,默认值是 10 -->
      <property>
      <name>dfs.namenode.handler.count</name>
      <value>21</value>
      </property>
    • core-site.xml

      1
      2
      3
      4
      5
      <!-- 配置垃圾回收时间为 60 分钟 -->
      <property>
      <name>fs.trash.interval</name>
      <value>60</value>
      </property>
    • 分发配置

  • MapReduce参数调优

    • mapred-site.xml

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      <!-- 环形缓冲区大小,默认 100m -->
      <property>
      <name>mapreduce.task.io.sort.mb</name>
      <value>100</value>
      </property>
      <!-- 环形缓冲区溢写阈值,默认 0.8 -->
      <property>
      <name>mapreduce.map.sort.spill.percent</name>
      <value>0.80</value>
      </property>
      <!-- merge 合并次数,默认 10 个 -->
      <property>
      <name>mapreduce.task.io.sort.factor</name>
      <value>10</value>
      </property>
      <!-- maptask 内存,默认 1g; maptask 堆内存大小默认和该值大小一致 mapreduce.map.java.opts -->
      <property>
      <name>mapreduce.map.memory.mb</name>
      <value>-1</value>
      <description>The amount of memory to request from the
      scheduler for each map task. If this is not specified or is non-positive, it is inferred from mapreduce.map.java.opts and mapreduce.job.heap.memory-mb.ratio. If java-opts are also not specified, we set it to 1024.
      </description>
      </property>
      <!-- matask 的 CPU 核数,默认 1 个 -->
      <property>
      <name>mapreduce.map.cpu.vcores</name>
      <value>1</value>
      </property>
      <!-- matask 异常重试次数,默认 4 次 -->
      <property>
      <name>mapreduce.map.maxattempts</name>
      <value>4</value>
      </property>
      <!-- 每个 Reduce 去 Map 中拉取数据的并行数。默认值是 5 -->
      <property>
      <name>mapreduce.reduce.shuffle.parallelcopies</name>
      <value>5</value>
      </property>
      <!-- Buffer 大小占 Reduce 可用内存的比例,默认值 0.7 -->
      <property>
      <name>mapreduce.reduce.shuffle.input.buffer.percent</name>
      <value>0.70</value>
      </property>
      <!-- Buffer 中的数据达到多少比例开始写入磁盘,默认值 0.66。 --> <property>
      <name>mapreduce.reduce.shuffle.merge.percent</name>
      <value>0.66</value>
      </property>
      <!-- reducetask 内存,默认 1g;reducetask 堆内存大小默认和该值大小一致 mapreduce.reduce.java.opts -->
      <property>
      <name>mapreduce.reduce.memory.mb</name>
      <value>-1</value>
      <description>The amount of memory to request from the
      scheduler for each reduce task. If this is not specified or is non-positive, it is inferred
      from mapreduce.reduce.java.opts and mapreduce.job.heap.memory-mb.ratio.
      If java-opts are also not specified, we set it to 1024.
      </description>
      </property>
      <!-- reducetask 的 CPU 核数,默认 1 个 -->
      <property>
      <name>mapreduce.reduce.cpu.vcores</name>
      <value>2</value>
      </property>
      <!-- reducetask 失败重试次数,默认 4 次 -->
      <property>
      <name>mapreduce.reduce.maxattempts</name>
      <value>4</value>
      </property>
      <!-- 当MapTask完成的比例达到该值后才会为ReduceTask申请资源。默认是0.05 -->
      <property> <name>mapreduce.job.reduce.slowstart.completedmaps</name> <value>0.05</value>
      </property>
      <!-- 如果程序在规定的默认 10 分钟内没有读到数据,将强制超时退出 --> <property>
      <name>mapreduce.task.timeout</name>
      <value>600000</value>
      </property>
    • 分发配置

  • Yarn参数调优

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    <!-- 选择调度器,默认容量 -->
    <property>
    <description>The class to use as the resource scheduler.</description> <name>yarn.resourcemanager.scheduler.class</name> <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capaci
    ty.CapacityScheduler</value>
    </property>
    <!-- ResourceManager处理调度器请求的线程数量,默认50;如果提交的任务数大于50,可以 增加该值,但是不能超过 3 台 * 4 线程 = 12 线程(去除其他应用程序实际不能超过 8) -->
    <property>
    <description>Number of threads to handle scheduler
    interface.</description>
    <name>yarn.resourcemanager.scheduler.client.thread-count</name>
    <value>8</value>
    </property>
    <!-- 是否让 yarn 自动检测硬件进行配置,默认是 false,如果该节点有很多其他应用程序,建议 手动配置。如果该节点没有其他应用程序,可以采用自动 -->
    <property>
    <description>Enable auto-detection of node capabilities such as
    memory and CPU.
    </description>
    <name>yarn.nodemanager.resource.detect-hardware-capabilities</name>
    <value>false</value>
    </property>
    <!-- 是否将虚拟核数当作 CPU 核数,默认是 false,采用物理 CPU 核数 --> <property>
    <description>Flag to determine if logical processors(such as
    hyperthreads) should be counted as cores. Only applicable on Linux
    when yarn.nodemanager.resource.cpu-vcores is set to -1 and
    yarn.nodemanager.resource.detect-hardware-capabilities is true.
    </description>
    <name>yarn.nodemanager.resource.count-logical-processors-as-
    cores</name>
    <value>false</value>
    </property>
    <!-- 虚拟核数和物理核数乘数,默认是 1.0 -->
    <property>
    <description>Multiplier to determine how to convert phyiscal cores to
    vcores. This value is used if yarn.nodemanager.resource.cpu-vcores
    is set to -1(which implies auto-calculate vcores) and yarn.nodemanager.resource.detect-hardware-capabilities is set to true.
    The number of vcores will be calculated as number of CPUs * multiplier. </description>
    <name>yarn.nodemanager.resource.pcores-vcores-multiplier</name>
    <value>1.0</value>
    </property>
    <!-- NodeManager 使用内存数,默认 8G,修改为 4G 内存 -->
    <property>
    <description>Amount of physical memory, in MB, that can be allocated
    for containers. If set to -1 and
    yarn.nodemanager.resource.detect-hardware-capabilities is true, it is
    automatically calculated(in case of Windows and Linux).In other cases, the default is 8192MB.
    </description>
    <name>yarn.nodemanager.resource.memory-mb</name>
    <value>4096</value>
    </property>
    <!-- nodemanager 的 CPU 核数,不按照硬件环境自动设定时默认是 8 个,修改为 4 个 -->
    <property>
    <description>Number of vcores that can be allocated
    for containers. This is used by the RM scheduler when allocating
    resources for containers. This is not used to limit the number of
    CPUs used by YARN containers. If it is set to -1 and
    yarn.nodemanager.resource.detect-hardware-capabilities is true, it is
    automatically determined from the hardware in case of Windows and Linux.
    In other cases, number of vcores is 8 by default.</description>
    <name>yarn.nodemanager.resource.cpu-vcores</name>
    <value>4</value>
    </property>
    <!-- 容器最小内存,默认 1G -->
    <property>
    <description>The minimum allocation for every container request at the RM in MBs. Memory requests lower than this will be set to the value of this property. Additionally, a node manager that is configured to have less memory than this value will be shut down by the resource manager.
    </description>
    <name>yarn.scheduler.minimum-allocation-mb</name>
    <value>1024</value>
    </property>
    <!-- 容器最大内存,默认 8G,修改为 2G -->
    <property>
    <description>The maximum allocation for every container request at the RM in MBs. Memory requests higher than this will throw an
    InvalidResourceRequestException.
    </description>
    <name>yarn.scheduler.maximum-allocation-mb</name>
    <value>2048</value>
    </property>
    <!-- 容器最小 CPU 核数,默认 1 个 -->
    <property>
    <description>The minimum allocation for every container request at the RM in terms of virtual CPU cores. Requests lower than this will be set to the value of this property. Additionally, a node manager that is configured to have fewer virtual cores than this value will be shut down by the resource manager.
    </description>
    <name>yarn.scheduler.minimum-allocation-vcores</name>
    <value>1</value>
    </property>
    <!-- 容器最大 CPU 核数,默认 4 个,修改为 2 个 -->
    <property>
    <description>The maximum allocation for every container request at the RM in terms of virtual CPU cores. Requests higher than this will throw an
    InvalidResourceRequestException.</description>
    <name>yarn.scheduler.maximum-allocation-vcores</name>
    <value>2</value>
    </property>
    <!-- 虚拟内存检查,默认打开,修改为关闭 -->
    <property>
    <description>Whether virtual memory limits will be enforced for
    containers.</description>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
    </property>
    <!-- 虚拟内存和物理内存设置比例,默认 2.1 -->
    <property>
    <description>Ratio between virtual memory to physical memory when setting memory limits for containers. Container allocations are expressed in terms of physical memory, and virtual memory usage is
    allowed to exceed this allocation by this ratio.
    </description>
    <name>yarn.nodemanager.vmem-pmem-ratio</name>
    <value>2.1</value>
    </property>

源码

启动

NameNode

  1. 启动9870端口服务
  2. 加载镜像文件和编辑日志
  3. 初始化NN的RPC服务端
  4. NN启动资源检查
  5. DN心跳超时判断
  6. 安全模式

DataNode

  1. 初始化DataXceiverServer
  2. 初始化HTTP服务
  3. 初始化DN的PRC服务端
  4. DN向NN注册
  5. 向NN发送心跳

HDFS上传

Create创建过程

  1. DN向NN发起创建请求
  2. NN处理DN的创建请求
  3. DataStreamer启动

Write上传过程

  1. 向DataStreamer的队列写数据
  2. 建立管道
    1. 机架感知
    2. Socket发送
    3. Socket接收
  3. 客户端接受DN写数据应答

HA高可用

概述

  • 单NN的问题
    • NN如果故障了,整个HDFS集群就不可用(中心化集群)
    • 解决方案:配置多个NN
      • 只有一台NN对外提供服务,其他的NN都是替补(Standby),提供服务的NN宕机时其他的NN自动切换成Active状态(采用高可用集群中的自动故障转移机制来完成切换)
      • 不需要2NN
      • 会添加一个新的服务:JournalNode
        • JournalNode:主要负责编辑日志文件的内容的共享
          • 和Zookeeper集群很像,也要搭建成一个集群的状态,存活机器数量过半就能正常提供服务
  • YARN集群存在RM单点故障问题
    • 参照HDFS的分析

搭建

准备工作

  1. 删除一些多余的目录文件 保证是一个初始化集群的状态

    1
    rm -rf data/ logs/

正式搭建

  1. 修改配置文件 core-site.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    <configuration>
    <!-- 把多个NameNode的地址组装成一个集群mycluster(名字自定义) -->
    <property>
    <name>fs.defaultFS</name>
    <value>hdfs://bigdata</value>
    </property>

    <!-- 指定hadoop数据的存储目录 -->
    <property>
    <name>hadoop.tmp.dir</name>
    <value>/hadoop/data</value>
    </property>

    <!-- 配置HDFS网页登录使用的静态用户为root -->
    <property>
    <name>hadoop.http.staticuser.user</name>
    <value>root</value>
    </property>

    <!-- 配置该root(superUser)允许通过代理访问的主机节点 -->
    <property>
    <name>hadoop.proxyuser.root.hosts</name>
    <value>*</value>
    </property>
    <!-- 配置该root(superUser)允许通过代理用户所属组 -->
    <property>
    <name>hadoop.proxyuser.root.groups</name>
    <value>*</value>
    </property>
    <!-- 配置该root(superUser)允许通过代理的用户-->
    <property>
    <name>hadoop.proxyuser.root.groups</name>
    <value>*</value>
    </property>
    </configuration>
  2. 修改配置文件 hdfs-site.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    <configuration>
    <!-- NameNode数据存储目录 -->
    <property>
    <name>dfs.namenode.name.dir</name>
    <value>file://${hadoop.tmp.dir}/name</value>
    </property>
    <!-- DataNode数据存储目录 -->
    <property>
    <name>dfs.datanode.data.dir</name>
    <value>file://${hadoop.tmp.dir}/data</value>
    </property>
    <!-- JournalNode数据存储目录 -->
    <property>
    <name>dfs.journalnode.edits.dir</name>
    <value>${hadoop.tmp.dir}/jn</value>
    </property>
    <!-- 完全分布式集群名称 -->
    <property>
    <name>dfs.nameservices</name>
    <value>bigdata</value>
    </property>
    <!-- 集群中NameNode节点都有哪些 -->
    <property>
    <name>dfs.ha.namenodes.bigdata</name>
    <value>nn1,nn2,nn3</value>
    </property>
    <!-- NameNode的RPC通信地址 -->
    <property>
    <name>dfs.namenode.rpc-address.bigdata.nn1</name>
    <value>bigdata1:8020</value>
    </property>
    <property>
    <name>dfs.namenode.rpc-address.bigdata.nn2</name>
    <value>bigdata2:8020</value>
    </property>
    <property>
    <name>dfs.namenode.rpc-address.bigdata.nn3</name>
    <value>bigdata3:8020</value>
    </property>
    <!-- NameNode的http通信地址 -->
    <property>
    <name>dfs.namenode.http-address.bigdata.nn1</name>
    <value>bigdata1:9870</value>
    </property>
    <property>
    <name>dfs.namenode.http-address.bigdata.nn2</name>
    <value>bigdata2:9870</value>
    </property>
    <property>
    <name>dfs.namenode.http-address.bigdata.nn3</name>
    <value>bigdata3:9870</value>
    </property>
    <!-- 指定NameNode元数据在JournalNode上的存放位置 -->
    <property>
    <name>dfs.namenode.shared.edits.dir</name>
    <value>qjournal://bigdata1:8485;bigdata2:8485;bigdata3:8485/bigdata</value>
    </property>
    <!-- 访问代理类:client用于确定哪个NameNode为Active -->
    <property>
    <name>dfs.client.failover.proxy.provider.bigdata</name>
    <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
    </property>
    <!-- 配置隔离机制,即同一时刻只能有一台服务器对外响应 -->
    <property>
    <name>dfs.ha.fencing.methods</name>
    <value>sshfence</value>
    </property>
    <!-- 使用隔离机制时需要ssh秘钥登录-->
    <property>
    <name>dfs.ha.fencing.ssh.private-key-files</name>
    <value>/root/.ssh/id_rsa</value>
    </property>
    </configuration>
  3. 在各个JournalNode节点上,输入以下命令启动journalnode服务

    1
    hdfs --daemon start journalnode
  4. 在 nn1 上,对其进行格式化,并启动

    1
    2
    hdfs namenode -format
    hdfs --daemon start namenode
  5. 在 nn2 和 3的nn3上,同步nn1的元数据信息

    1
    hdfs namenode -bootstrapStandby
  6. 分别在 hadoop103上启动nn1 和 hadoop104上启动nn2

    1
    hdfs --daemon start namenode
  7. 在每台机器上启动DN

    1
    hdfs --daemon start datanode
  8. 将其中的一个nn切换成Active状态

    1
    hdfs haadmin -transitionToActive nn1
  9. 查看是否Active

    1
    hdfs haadmin -getServiceState nn1   

HDFS集群规划

  • ZKFC
    • 集群启动时,每台NN的ZKFC会到ZK指定的节点上写入自己的内容,哪台NN写入成功就转为Active状态
    • ZKFC通过ping的方式来检测当前NN的健康状态,一旦发现当前NN不可用就会立刻通知ZK当前NN不可用
    • ZK把属于之前active的NN写入的内容删除后,通过通知机制告知其他NN的ZKFC进程,让其他NN争抢写入
  • 规划
    • BigData1: Namenode Datanode JournalNode ZKFC ZK
    • BigData2 : Namenode Datanode JournalNode ZKFC ZK
    • BigData3 : Namenode Datanode JournalNode ZKFC ZK

故障自动转移

  1. 在start-dfs.sh中添加

    1
    export HDFS_ZKFC_USER=root
  2. 在core-site.xml文件中增加

    1
    2
    3
    4
    5
    <!-- 指定zkfc要连接的zkServer地址 -->
    <property>
    <name>ha.zookeeper.quorum</name>
    <value>bigdata1:2181,bigdata2:2181,bigdata3:2181</value>
    </property>
  3. 在hdfs-site.xml中增加

    1
    2
    3
    4
    5
    <!-- 启用nn故障自动转移 -->
    <property>
    <name>dfs.ha.automatic-failover.enabled</name>
    <value>true</value>
    </property>
  4. 修改后分发配置文件

  5. 关闭HDFS集群

    1
    stop-dfs.sh
  6. 启动Zookeeper集群

    1
    zk.sh start
  7. 初始化HA在Zookeeper中状态

    1
    hdfs zkfc -formatZK
  8. 启动HDFS服务

    1
    start-dfs.sh
  9. 可以去zkCli.sh客户端查看Namenode选举锁节点内容

    1
    get /hadoop-ha/mycluster/ActiveStandbyElectorLock
  10. 测试故障自动转移

    1. 将当前状态为Active的namenode 杀死
    2. 刷新另外两台namenode的web端,关注状态
    3. 最后可以到zk中验证锁内容的名称

YARN集群规划

  • hadoop102:Namenode Datanode JournalNode ZKFC ZK ResourceManager NodeManager
  • hadoop103:Namenode Datanode JournalNode ZKFC ZK ResourceManager NodeManager
  • hadoop104:Namenode Datanode JournalNode ZKFC ZK ResourceManager NodeManager

YARN集群搭建

  1. 修改yarn-site.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    <configuration>
    <!-- 启用resourcemanager ha -->
    <property>
    <name>yarn.resourcemanager.ha.enabled</name>
    <value>true</value>
    </property>
    <!-- 声明三台resourcemanager的地址 -->
    <property>
    <name>yarn.resourcemanager.cluster-id</name>
    <value>cluster-yarn</value>
    </property>
    <!--指定resourcemanager的逻辑列表-->
    <property>
    <name>yarn.resourcemanager.ha.rm-ids</name>
    <value>rm1,rm2,rm3</value>
    </property>

    <!-- ========== rm1的配置 ========== -->
    <!-- 指定rm1的主机名 -->
    <property>
    <name>yarn.resourcemanager.hostname.rm1</name>
    <value>bigdata1</value>
    </property>
    <!-- 指定rm1的web端地址 -->
    <property>
    <name>yarn.resourcemanager.webapp.address.rm1</name>
    <value>bigdata1:8088</value>
    </property>
    <!-- 指定rm1的内部通信地址 -->
    <property>
    <name>yarn.resourcemanager.address.rm1</name>
    <value>bigdata1:8032</value>
    </property>
    <!-- 指定AM向rm1申请资源的地址 -->
    <property>
    <name>yarn.resourcemanager.scheduler.address.rm1</name>
    <value>bigdata1:8030</value>
    </property>
    <!-- 指定供NM连接的地址 -->
    <property>
    <name>yarn.resourcemanager.resource-tracker.address.rm1</name>
    <value>bigdata1:8031</value>
    </property>

    <!-- ========== rm2的配置 ========== -->
    <!-- 指定rm2的主机名 -->
    <property>
    <name>yarn.resourcemanager.hostname.rm2</name>
    <value>bigdata2</value>
    </property>
    <property>
    <name>yarn.resourcemanager.webapp.address.rm2</name>
    <value>bigdata2:8088</value>
    </property>
    <property>
    <name>yarn.resourcemanager.address.rm2</name>
    <value>bigdata2:8032</value>
    </property>
    <property>
    <name>yarn.resourcemanager.scheduler.address.rm2</name>
    <value>bigdata2:8030</value>
    </property>
    <property>
    <name>yarn.resourcemanager.resource-tracker.address.rm2</name>
    <value>bigdata2:8031</value>
    </property>

    <!-- ========== rm3的配置 ========== -->
    <!-- 指定rm3的主机名 -->
    <property>
    <name>yarn.resourcemanager.hostname.rm3</name>
    <value>bigdata3</value>
    </property>
    <property>
    <name>yarn.resourcemanager.webapp.address.rm3</name>
    <value>bigdata3:8088</value>
    </property>
    <property>
    <name>yarn.resourcemanager.address.rm3</name>
    <value>bigdata3:8032</value>
    </property>
    <property>
    <name>yarn.resourcemanager.scheduler.address.rm3</name>
    <value>bigdata3:8030</value>
    </property>
    <property>
    <name>yarn.resourcemanager.resource-tracker.address.rm3</name>
    <value>bigdata3:8031</value>
    </property>

    <!-- 指定zookeeper集群的地址 -->
    <property>
    <name>yarn.resourcemanager.zk-address</name>
    <value>bigdata1:2181,bigdata2:2181,bigdata3:2181</value>
    </property>

    <!-- 启用自动恢复 -->
    <property>
    <name>yarn.resourcemanager.recovery.enabled</name>
    <value>true</value>
    </property>

    <!-- 指定resourcemanager的状态信息存储在zookeeper集群 -->
    <property>
    <name>yarn.resourcemanager.store.class</name>
    <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
    </property>
    </configuration>
  2. 将yarn-site.xml文件进行分发

  3. 在任意的机器上启动yarn

    1
    start-yarn.sh

联邦架构(了解)

  • 当集群中数据量超级大时,NameNode的内存成了性能的瓶颈,所以提出了联邦机制

  • 原理

    • 将NameNode划分成不同的命名空间并进行编号。不同的命名空间之间相互隔离互不干扰
    • 在DataNode中创建目录,此目录对应命名空间的编号。由此,编号相同的数据由对应的命名空间进行管理
    1
    2
    3
    128G * 1024(M) * 1024(KB) * 1024(bety) / 150 = xxx
    xxx * 256M = yyy
    yyy / 1024(G) / 1024(TB) / 1024(PB) = 200 左右PB的数据

Hadoop
http://docs.mousse.cc/Hadoop/
作者
Mocha Mousse
发布于
2025年5月26日
许可协议