声明: 1. 本文为我的个人复习总结, 并非那种从零基础开始普及知识 内容详细全面, 言辞官方的文章
2. 由于是个人总结, 所以用最精简的话语来写文章
3. 若有错误不当之处, 请指出
Writable类型:
Java类型 |
Hadoop Writable类型 |
Boolean |
BooleanWritable |
Byte |
ByteWritable |
Int |
IntWritable |
Float |
FloatWritable |
Long |
LongWritable |
Double |
DoubleWritable |
String |
Text |
Map |
MapWritable |
Array |
ArrayWritable |
工作机制:
MapTask工作机制:
- Read阶段: 通过用户编写的RecordReader, 从输入InputSplit中解析出一个个key-value
- Map阶段: 该节点主要是将解析出的key/value交给用户编写map( )函数处理, 并产生一系列新的key-value
- Collect收集阶段: 调用OutputCollector.collect( )输出结果。它会将生成的key/value分区(调用Partitioner), 写入环形缓冲区
- Shuffle的Spill阶段
- Shuffle的Combine阶段
ReduceTask工作机制:
-
Copy阶段: ReduceTask拷贝MapTask计算出的数据; 并针对某一片数据, 如果其大小超过一定阈值, 则写到磁盘上, 否则直接放到内存中
-
Sort阶段: 对所有数据全局进行一次归并排序
-
Reduce阶段: reduce( )函数将计算结果输出
切片规则:
- 切片时不考虑整体, 而是每个Block块都单独进行切片
- 切片大小默认等于Block大小, 使得切片效果最佳 使小文件切片少一些
- 一个切片会启动一个MapTask
几种不同的切片机制:
-
FileInputFormat
-
看剩余部分是否在1.1倍Block范围内,不超过则剩余部分就按一块来切; 所以切片最大为1.1倍Block
-
计算切片大小的源码 computeSplitSize(Math.max(minSize,Math.min(maxSize,blockSize)))
要是想调小切片大小, 就应该把maxSize值调小
要是想调大切片大小, 就应该把minSize值调大
则150M文件被切成128M+22M
-
CombineTextInputFormat
虚拟存储切片最大值设置: CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4m
切片过程: setMaxInputSplitSize的最大值记住maxSize
-
虚拟存储过程(虚拟存储文件块切分):
- <=maxSize时, 先逻辑上划为一个块
- >maxSize && <=2*maxSize时, 逻辑上均分成2个块 ((防止出现太小切片; 这块太小了, 切了更小后可以和别的组进行合并成大的)
- >2*maxSize时, 逻辑上以maxSize进行切块
-
切片过程(对虚拟存储文件块 小文件合并,然后切片):
<maxSize时, 和下一个虚拟存储文件块进行合并, 形成一个切片
>=maxSize时, 直接形成一个切片
Shuffle:
ReduceTask数量=0时, 不会执行Shuffle
环形缓冲区:
默认大小100M, 达到80%开始溢写;
这个缓冲区本质上是一个数组, 叫环形是因为两边分别存 索引 和 实际数据, 排序时是排索引, 溢写时的排序按索引去查找实际数据(我推测索引应该近似于key值, 去找实际的K-V)
-
发生在Map之后, Reduce之前 combiner就是发生在这里的
-
期间有3个排序:
- 发生在Map端: 在环形缓冲区里getPatrtition( ) 后, 溢写时按key的索引进行快排
- 发生在Map端: 溢写的小文件内的同一分区内有序, 接下来溢写小文件间取10个组队进行归并排序
- 发生在Reduce端: Reduce读取了Map端的文件, 由于Map端输出的文件大小>内存大小, 想要用小内存去排序 众多内部分区间内已有序的文件, 进行了借助磁盘的全局的归并排序(类似于1G内存对100G数据进行排序)
然后在Reduce端: 同一个Reduce分区中的数据按key分组后, 就可以执行reduce方法了
这些排序是默认发生的, 它是为了reduce方法的同key在一组的分组, 排序有利于分组的进行
优化:
- 自定义分区重写getPatrtition( ), 把热点key拼接随机值进行打散, 避免发生数据倾斜
- 增大环形缓冲区大小 或 提高溢写文件的阈值百分比(减小了溢写文件的个数, 后面归并方便些)
- 在不影响业务逻辑的前提下, 使用combiner预聚合, 减少Map到Reduce的数据传输
- 增大一次进行归并文件的个数
- 采用压缩, 减少磁盘IO和数据网络传输
- 提高Reduce端拉取Map端的文件个数
数据倾斜:
数据倾斜是热点数据造成的
数据倾斜有计算倾斜(reduce端数据分布不均)和存储倾斜(HBase的Region存储数据分布不均)
MR解决计算数据倾斜的方案:
-
在不影响业务的前提下, 提前在map进行预聚合combine, 减少传输的数据量
map数量较多, 这点预聚合计算任务对map而言并不重
-
处理热点数据(key)
-
key为NULL时
- 属于异常数据就提前过滤掉
- 不属于异常数据就拼接随机值
-
key不为NULL时
拼接随机值
然后进行两阶段MR聚合:
第一次MR带着随机值聚合一部分, 即局部聚合;
第二次MR去掉随机值进行最终聚合 即全局聚合
所谓的随机值, 并不是UUID完全随机, 因为那样第一个MR相当于没干任何聚合的活, 第二个MR拆掉后缀随机值后照样数据倾斜;
应该是某一个区间内的随机值(如随机值%100), 当1亿个同key的数据%100 [0,99]分区进行聚合, 第二个MR去掉后缀随机值后只需要聚合的是这100个同key的数据, 任务量就很小了
-
实现自定义分区
重写getPatrtition( ), 把热点key拼接随机值进行打散, 避免发生数据倾斜
-
Join操作时使用MapJoin, 提前加载数据到内存,再用Mapper去执行join逻辑; 没有Reducer了就减少了数据倾斜发生的概率
Mapper很多, 且key并不是按hash( )取模 决定放在哪一台机器的, 所以一般就不会数据倾斜
Reducer很少, 且key按hash( )取模 决定放在哪一台机器, 容易产生数据倾斜
Map不是越多越好, 因为其本身也要占用资源, 启动也慢
Reduce不是越多越好:
**Map端的数据分布到Reduce端的各个Partition: **
默认的getPatition( ) 是hash(数据)%ReduceTask数量, 分区号从0开始算
如果继承Partition方法 自定义分区不合理, 可能出现以下状况:
以下getPatition( )数量, 是对从0开始递增数分区而言的, 找不到指定分区肯定报错
- ReduceTask数量>getPatition( )数量, 则会多产生几个空的输出文件part-r-000xx
- 1<ReduceTask数量<getPatition( )数量, 则会发生异常, 因为一部分分区数据无处放置
- ReduceTask数量=1, 则不管Map端输出多少个分区文件, 都会交给这个ReduceTask去执行
- ReduceTask数量=0时, 不会执行Shuffle
自定义Combiner: 继承Reducer, 重写reduce方法
自定义排序: 实现WritableComparable接口重写compareTo方法
排序分类:
三种join:
Map端和Reduce端的setup( )方法, 是初始化方法, 可以获取到Context从而获取到文件名
用flag字段 来标识着是哪张表
-
ReduceJoin
setup( )加载提取文件名,
map( )根据不同文件名进行提取不同字段
reduce( )里进行join
-
MapJoin (适用于有一张是小表)
// 加载缓存数据
job.addCacheFile(new URI(“file:///e:/input/inputcache/pd.txt”));
job.setNumReduceTasks(0);
setup( ), 加载缓存文件, 缓存小表数据
map( )端进行join
没有reduce( )端, 即省去了Shuffle
-
SemiJoin(半连接, 是前两种的结合, 适用于有一张是小表)
// 加载缓存数据
job.addCacheFile(new URI(“file:///e:/input/inputcache/pd.txt”));
setup( ), 加载缓存文件, 缓存小表数据
map( )端进行过滤, 如果大表的连接字段不在小表连接字段的集合中, 就提前过滤掉;
reduce( )里进行join
ReduceJoin的缺点:
- ReduceTask要占用资源且耗时
- 有Reduce时要经历Shuffle排序
坑:
- Hadoop为了更省内存, 对集合迭代器进行了优化;
reduce方法的迭代器values, for循环或者迭代器 遍历得到的value指向的那个对象内存一直被重用,
所有value用的同一块内存进行了覆盖, 导致了ReduceJoin得到的集合元素数据不断在覆盖前面的内容
所以需要自己BeanUtils.copyProperties进行拷贝后, 再添加到List集合里
-
Mapper中第一个参数必须是LongWritable或NullWritable, 不可以是IntWritable, 否则报类型转换异常。
因为LongWritable是文件行数, 默认它是大数据场景不能为IntWritable