MapReduce基础知识(个人总结)

2023-11-12

    声明: 1. 本文为我的个人复习总结, 并那种从零基础开始普及知识 内容详细全面, 言辞官方的文章
              2. 由于是个人总结, 所以用最精简的话语来写文章
              3. 若有错误不当之处, 请指出

Writable类型:

Java类型 Hadoop Writable类型
Boolean BooleanWritable
Byte ByteWritable
Int IntWritable
Float FloatWritable
Long LongWritable
Double DoubleWritable
String Text
Map MapWritable
Array ArrayWritable

工作机制:

MapTask工作机制:

  1. Read阶段: 通过用户编写的RecordReader, 从输入InputSplit中解析出一个个key-value
  2. Map阶段: 该节点主要是将解析出的key/value交给用户编写map( )函数处理, 并产生一系列新的key-value
  3. Collect收集阶段: 调用OutputCollector.collect( )输出结果。它会将生成的key/value分区(调用Partitioner), 写入环形缓冲区
  4. Shuffle的Spill阶段
  5. Shuffle的Combine阶段

ReduceTask工作机制:

  1. Copy阶段: ReduceTask拷贝MapTask计算出的数据; 并针对某一片数据, 如果其大小超过一定阈值, 则写到磁盘上, 否则直接放到内存中

  2. Sort阶段: 对所有数据全局进行一次归并排序

  3. Reduce阶段: reduce( )函数将计算结果输出

切片规则:

  • 切片时不考虑整体, 而是每个Block块都单独进行切片
  • 切片大小默认等于Block大小, 使得切片效果最佳 使小文件切片少一些
  • 一个切片会启动一个MapTask

几种不同的切片机制:

  1. FileInputFormat

    • 看剩余部分是否在1.1倍Block范围内,不超过则剩余部分就按一块来切; 所以切片最大为1.1倍Block

    • 计算切片大小的源码 computeSplitSize(Math.max(minSize,Math.min(maxSize,blockSize)))

      要是想调小切片大小, 就应该把maxSize值调小

      要是想调大切片大小, 就应该把minSize值调大

    则150M文件被切成128M+22M

  2. CombineTextInputFormat

    虚拟存储切片最大值设置: CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4m

    切片过程: setMaxInputSplitSize的最大值记住maxSize

    1. 虚拟存储过程(虚拟存储文件块切分):

      • <=maxSize时, 先逻辑上划为一个块
      • >maxSize && <=2*maxSize时, 逻辑上均分成2个块 ((防止出现太小切片; 这块太小了, 切了更小后可以和别的组进行合并成大的)
      • >2*maxSize时, 逻辑上以maxSize进行切块
    2. 切片过程(对虚拟存储文件块 小文件合并,然后切片):

      <maxSize时, 和下一个虚拟存储文件块进行合并, 形成一个切片

      >=maxSize时, 直接形成一个切片

Shuffle:

ReduceTask数量=0时, 不会执行Shuffle

环形缓冲区:

​ 默认大小100M, 达到80%开始溢写;

​ 这个缓冲区本质上是一个数组, 叫环形是因为两边分别存 索引 和 实际数据, 排序时是排索引, 溢写时的排序按索引去查找实际数据(我推测索引应该近似于key值, 去找实际的K-V)

  • 发生在Map之后, Reduce之前 combiner就是发生在这里的

  • 期间有3个排序:

    1. 发生在Map端: 在环形缓冲区里getPatrtition( ) 后, 溢写时按key的索引进行快排
    2. 发生在Map端: 溢写的小文件内的同一分区内有序, 接下来溢写小文件间取10个组队进行归并排序
    3. 发生在Reduce端: Reduce读取了Map端的文件, 由于Map端输出的文件大小>内存大小, 想要用小内存去排序 众多内部分区间内已有序的文件, 进行了借助磁盘的全局的归并排序(类似于1G内存对100G数据进行排序)

    然后在Reduce端: 同一个Reduce分区中的数据按key分组后, 就可以执行reduce方法了

    这些排序是默认发生的, 它是为了reduce方法的同key在一组的分组, 排序有利于分组的进行

优化:

  1. 自定义分区重写getPatrtition( ), 把热点key拼接随机值进行打散, 避免发生数据倾斜
  2. 增大环形缓冲区大小 或 提高溢写文件的阈值百分比(减小了溢写文件的个数, 后面归并方便些)
  3. 在不影响业务逻辑的前提下, 使用combiner预聚合, 减少Map到Reduce的数据传输
  4. 增大一次进行归并文件的个数
  5. 采用压缩, 减少磁盘IO和数据网络传输
  6. 提高Reduce端拉取Map端的文件个数

数据倾斜:

数据倾斜是热点数据造成的

数据倾斜有计算倾斜(reduce端数据分布不均)和存储倾斜(HBase的Region存储数据分布不均)

MR解决计算数据倾斜的方案:

  1. 在不影响业务的前提下, 提前在map进行预聚合combine, 减少传输的数据量

    map数量较多, 这点预聚合计算任务对map而言并不重

  2. 处理热点数据(key)

    • key为NULL时

      • 属于异常数据就提前过滤掉
      • 不属于异常数据就拼接随机值
    • key不为NULL时

      拼接随机值

    然后进行两阶段MR聚合:

    第一次MR带着随机值聚合一部分, 即局部聚合;

    第二次MR去掉随机值进行最终聚合 即全局聚合

    所谓的随机值, 并不是UUID完全随机, 因为那样第一个MR相当于没干任何聚合的活, 第二个MR拆掉后缀随机值后照样数据倾斜;
    应该是某一个区间内的随机值(如随机值%100), 当1亿个同key的数据%100 [0,99]分区进行聚合, 第二个MR去掉后缀随机值后只需要聚合的是这100个同key的数据, 任务量就很小了

  3. 实现自定义分区

    重写getPatrtition( ), 把热点key拼接随机值进行打散, 避免发生数据倾斜

  4. Join操作时使用MapJoin, 提前加载数据到内存,再用Mapper去执行join逻辑; 没有Reducer了就减少了数据倾斜发生的概率

    Mapper很多, 且key并不是按hash( )取模 决定放在哪一台机器的, 所以一般就不会数据倾斜
    Reducer很少, 且key按hash( )取模 决定放在哪一台机器, 容易产生数据倾斜

Map不是越多越好, 因为其本身也要占用资源, 启动也慢

Reduce不是越多越好:

  • 因为其本身也要占用资源, 启动也慢

  • 而且有多少个Reduce就会有多少个文件, 增大出现小文件的概率

**Map端的数据分布到Reduce端的各个Partition: **

默认的getPatition( ) 是hash(数据)%ReduceTask数量, 分区号从0开始算

如果继承Partition方法 自定义分区不合理, 可能出现以下状况:

以下getPatition( )数量, 是对从0开始递增数分区而言的, 找不到指定分区肯定报错

  1. ReduceTask数量>getPatition( )数量, 则会多产生几个空的输出文件part-r-000xx
  2. 1<ReduceTask数量<getPatition( )数量, 则会发生异常, 因为一部分分区数据无处放置
  3. ReduceTask数量=1, 则不管Map端输出多少个分区文件, 都会交给这个ReduceTask去执行
  4. ReduceTask数量=0时, 不会执行Shuffle

自定义Combiner: 继承Reducer, 重写reduce方法

自定义排序: 实现WritableComparable接口重写compareTo方法

排序分类:

在这里插入图片描述

三种join:

Map端和Reduce端的setup( )方法, 是初始化方法, 可以获取到Context从而获取到文件名

用flag字段 来标识着是哪张表

  1. ReduceJoin

    setup( )加载提取文件名,

    map( )根据不同文件名进行提取不同字段

    reduce( )里进行join

  2. MapJoin (适用于有一张是小表)

    // 加载缓存数据

    job.addCacheFile(new URI(“file:///e:/input/inputcache/pd.txt”));

    job.setNumReduceTasks(0);

    setup( ), 加载缓存文件, 缓存小表数据

    map( )端进行join

    没有reduce( )端, 即省去了Shuffle

  3. SemiJoin(半连接, 是前两种的结合, 适用于有一张是小表)

    // 加载缓存数据

    job.addCacheFile(new URI(“file:///e:/input/inputcache/pd.txt”));

    setup( ), 加载缓存文件, 缓存小表数据

    map( )端进行过滤, 如果大表的连接字段不在小表连接字段的集合中, 就提前过滤掉;

    reduce( )里进行join

ReduceJoin的缺点:

  • ReduceTask要占用资源且耗时
  • 有Reduce时要经历Shuffle排序

坑:

  1. Hadoop为了更省内存, 对集合迭代器进行了优化;

​ reduce方法的迭代器values, for循环或者迭代器 遍历得到的value指向的那个对象内存一直被重用,

​ 所有value用的同一块内存进行了覆盖, 导致了ReduceJoin得到的集合元素数据不断在覆盖前面的内容

​ 所以需要自己BeanUtils.copyProperties进行拷贝后, 再添加到List集合里

  1. Mapper中第一个参数必须是LongWritable或NullWritable, 不可以是IntWritable, 否则报类型转换异常。

    因为LongWritable是文件行数, 默认它是大数据场景不能为IntWritable

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

MapReduce基础知识(个人总结) 的相关文章

  • Hadoop NoSuchMethodError apache.commons.cli

    我在用着hadoop 2 7 2我用 IntelliJ 做了一个 MapReduce 工作 在我的工作中 我正在使用apache commons cli 1 3 1我把库放在罐子里 当我在 Hadoop 集群上使用 MapReduceJob
  • 从 HDFS 传出文件

    我想将文件从 HDFS 传输到另一台服务器的本地文件系统 该服务器不在 hadoop 集群中 而是在网络中 我本可以这样做 hadoop fs copyToLocal
  • 运行 Sqoop 导入和导出时如何找到最佳映射器数量?

    我正在使用 Sqoop 版本 1 4 2 和 Oracle 数据库 运行 Sqoop 命令时 例如这样 sqoop import fs
  • 将 Spark 添加到 Oozie 共享库

    默认情况下 Oozie 共享 lib 目录提供 Hive Pig 和 Map Reduce 的库 如果我想在 Oozie 上运行 Spark 作业 最好将 Spark lib jar 添加到 Oozie 的共享库 而不是将它们复制到应用程序
  • 更改 Hadoop 中的数据节点数量

    如何改变数据节点的数量 即禁用和启用某些数据节点来测试可扩展性 说得更清楚一点 我有4个数据节点 我想一一实验1 2 3 4个数据节点的性能 是否可以只更新名称节点中的从属文件 临时停用节点的正确方法 创建一个 排除文件 这列出了您想要删除
  • MongoDB 存储过程等效项

    我有一个包含商店列表的大型 CSV 文件 其中一个字段是邮政编码 我有一个名为 ZipCodes 的独立 MongoDB 数据库 它存储任何给定邮政编码的纬度和经度 在 SQL Server 中 我将执行一个名为 InsertStore 的
  • 公平调度器和容量调度器有什么区别?

    我是 Hadoop 世界的新手 想了解公平调度程序和容量调度程序之间的区别 另外我们什么时候应该使用每一个 请简单地回答一下 因为我在网上读了很多东西 但从中得到的不多 公平调度是一种为作业分配资源的方法 使得所有作业随着时间的推移平均获得
  • Mapreduce shuffle 阶段出现内存不足错误

    我在运行时遇到奇怪的错误类似字数统计映射缩减程序 我有一个包含 20 个从站的 hadoop 集群 每个从站都有 4 GB RAM 我将 Map 任务配置为 300MB 堆 Reduce 任务槽为 1GB 我每个节点有 2 个映射槽和 1
  • 使用 org.apache.hadoop/* 依赖项离线编译 sbt 时遇到的问题

    使用依赖于 org apache hadoop 包的 sbt 进行离线编译时遇到很多麻烦 一个简单的build sbt name Test version 1 0 scalaVersion 2 10 4 libraryDependencie
  • 覆盖hadoop中的log4j.properties

    如何覆盖hadoop中的默认log4j properties 如果我设置 hadoop root logger WARN console 它不会在控制台上打印日志 而我想要的是它不应该在日志文件中打印 INFO 我在 jar 中添加了一个
  • 如何有效地将数据从 Kafka 移动到 Impala 表?

    以下是当前流程的步骤 Flafka http blog cloudera com blog 2014 11 flafka apache flume meets apache kafka for event processing 将日志写入
  • 全部配对图表上的所有路径

    这可能是一个没有最佳解决方案的问题 假设我有一个有向图 不知道它是否有循环 循环检测将是这个问题的方面之一 给定一组顶点 可能是数百万个顶点 我需要计算给定图的所有唯一对之间的所有不同路径 没有重复顶点的路径 我该如何应对这种情况 让我们看
  • 如何对 RDD 进行分区

    我有一个文本文件 其中包含大量由空格分隔的随机浮动值 我正在将此文件加载到 scala 中的 RDD 中 这个RDD是如何分区的 另外 是否有任何方法可以生成自定义分区 以便所有分区都具有相同数量的元素以及每个分区的索引 val dRDD
  • 错误 hive.HiveConfig:无法加载 org.apache.hadoop.hive.conf.HiveConf。确保 HIVE_CONF _DIR 设置正确

    我正在尝试将数据从 sqoop 导入到 hive MySQL use sample create table forhive id int auto increment firstname varchar 36 lastname varch
  • 为什么 CouchDB 归约函数接收“键”作为参数

    使用 CouchDB 减少功能 function keys values rereduce 这被称为这样 reduce key1 id1 key2 id2 key3 id3 value1 value2 value3 false 问题1 将键
  • 如何用snappy解压hadoop的reduce输出文件尾?

    我们的 hadoop 集群使用 snappy 作为默认编解码器 Hadoop作业减少输出文件名就像part r 00000 snappy JSnappy 无法解压缩文件 bcz JSnappy 需要以 SNZ 开头的文件 归约输出文件以某种
  • 是否可以通过编写单独的mapreduce程序并行执行Hive查询?

    我问了一些关于提高 Hive 查询性能的问题 一些答案与映射器和减速器的数量有关 我尝试使用多个映射器和减速器 但在执行中没有看到任何差异 不知道为什么 可能是我没有以正确的方式做 或者我错过了其他东西 我想知道是否可以并行执行 Hive
  • Hadoop - 直接从 Mapper 写入 HBase

    我有一个 hadoop 作业 其输出应写入 HBase 我并不真正需要减速器 我想要插入的行类型是在映射器中确定的 如何使用 TableOutputFormat 来实现此目的 从所有示例中 我看到的假设是 reducer 是创建 Put 的
  • AWS EMR 引导操作为 sudo

    我需要更新 etc hosts适用于我的 EMR 集群 EMR AMI 4 3 中的所有实例 整个脚本无非就是 bin bash echo e ip1 uri1 gt gt etc hosts echo e ip2 uri2 gt gt e
  • Hadoop 作业:任务在 601 秒内无法报告状态

    在伪节点上运行 hadoop 作业时 任务失败并被杀死 错误 任务尝试 在 601 秒内无法报告状态 但同一个程序正在通过 Eclipse 运行 本地作业 任务 大约有 25K 个关键字 输出将是所有可能的组合 一次两个 即大约 25K 2

随机推荐

  • SFuzz: Slice-based Fuzzing for Real-Time Operating Systems

    原文地址 SFuzz Proceedings of the 2022 ACM SIGSAC Conference on Computer and Communications Security 源码地址 NSSL SJTU SFuzz gi
  • Python面向过程编程主要知识

    摘自尚学堂的python 人工智能课程 用于复习 python是一种解释型的 面向对象的语言 python的特点 1 可读性强 易修改 2 简介 关注业务本身 生产效率高 3 面向对象 4 免费和开源 5 可移植性和跨平台 python被编
  • spring框架学习之路(一)-入门基础(1)-IOC(控制反转)&DI(依赖注入)

    前言 我就是一小白程序猴 不懂什么高新技术 只是在学习过程中把自己遇到问题或者学到的新知识记录下来 第一给自己复习用 第二小白更懂小白的苦 自己是新手所以应该更了解在刚开始学习时哪些学起来有困难 也就避开了所谓的专家盲点 给后面入坑的人一点
  • 【深度学习】使用kaggle提供的免费GPU在线训练模型

    背景 自己电脑没有GPU 只能找找网上的平台来跑模型 但是又买不起服务器 只能使用免费的平台这样子 免费的在线平台 各大计算平台免费GPU资源总结 本文要介绍的就是第三个 虽然是国外的 但是不用翻墙就可以访问 每周免费30小时使用时长 显卡
  • JAVA操作共享文件夹文件、下载、读取(windows、Linux通用)

    一 导入包 maven中央仓库https mvnrepository com artifact org samba jcifs jcifs 1 3 3
  • 在C#中??和?分别是什么意思?

    在C 中 和 分别是什么意思 1 可空类型修饰符 引用类型可以使用空引用表示一个不存在的值 而值类型通常不能表示为空 例如 string str null 是正确的 int i null 编译器就会报错 为了使值类型也可为空 就可以使用可空
  • 背完这套Java面试八股文,自动解锁面试牛逼症被动技能

    前言 国内的互联网面试 恐怕是现存的 最接近科举考试的制度 很多人对八股文都嗤之以鼻 认为无法衡量出一个程序员的真是水平 还有一部分人则是深恶痛绝 因为实在太难背了 但是国内大环境如此 互联网IT行业的求职者太多了 如果考察的是清一溜的算法
  • Weex 项目总结

    在项目中 我觉得暂时有两个地方需要总结一下 一个是weex内部的数据请求 一个是原生方法得调用 数据请求 在PC端调试的话会有跨域问题 在手机端没有跨域问题 原生方法需要原生开发者根据 Weex文档 写一个module 再暴露出一个方法给前
  • 解决springboot 项目配置文件指定端口号没生效

    方法1 指定启动端口号8022 覆盖配置文件 SpringBootApplication public class FadadaApplication public static void main String args SpringAp
  • Unity使用mesh绘制模型

    基本概念 首先要知道模型是如何产生的 比如说我们在一个3 3的空间创建这样9个点 vector3 这9个点构成了我们模型的范围 三点成三角 三角呈面 然后由面绘制出体 用这种方法可以绘制我们想要的图形 理论转为实践 第一步 绘制点 先将刚才
  • Vue 路由守卫详细介绍与演示

    Vue 路由守卫是一种在 Vue js 应用程序中控制路由导航的机制 它允许你在路由变化前 后或在特定路由上执行代码 以便实现诸如权限控制 数据加载 页面切换动画等功能 在下面的介绍中 我将首先提供官方定义和通俗解释 然后详细介绍全局前置路
  • python练习题(十九):有一分数序列:2/1,3/2,5/3,8/5,13/8,21/13...求出这个数列的前n项之和

    题目 有一分数序列 2 1 3 2 5 3 8 5 13 8 21 13 求出这个数列的前n项之和 n int input 请输入求和项数 n sum 0 记录前n项和 a 1 分母 b 2 分子 for i in range n n su
  • MinStack 和MaxStack

    leetcode链接 包含min函数的stack 分析 利用一个LinkedList 链表存储数据 类似于链stack 还有数组stack 采用ArrayList存储 关于如何查找最小元素的情况 思路一 双stack stack 保存正常的
  • git stash的用法

    首先 git stash的含义是将修改的代码先暂存起来 让本地仓库回到最后一次提交时的状态 便于代码的更新管理 主要避免修改文件与最新代码的冲突 最近项目中遇到一些文件修改了 暂时不想提交 就想到了使用stash命令 首先 可以将自己想提交
  • 学习笔记整理:网络应用技术-运输层(1)

    以下内容为个人的学习笔记整理 如有错误 请指出 谢谢 一 课前预习 1 数据交换有哪几种方式 电路交换 报文交换 分组交换 2 运输层实现的通信是什么之间的通信 两个网络应用程序之间的通信 3 运输层所说的端口有什么作用 什么是套接字 端口
  • 详解基于tensorflow实现对cifar100的识别,准确率达到65%附完整代码(涉及vggnet,resnet,,loss图像处理,图像增强,BN)

    文章目录 一 介绍cifar 数据集 二 resnet网络简介 a 网络结构图 b 使用resnet进行炼丹 c 第一次炼丹 d 第二次炼丹 完整代码 jupyter notebook 三 vggnet网络简介 a vggnet结构图 b
  • vue-cli+express前后端分离项目跨域问题解决

    1 express后端项目中使用命令npm i cors S安装cors 并在app js文件中引入cors 写下如下几行代码 var cors require cors 跨域 app use cors origin http localh
  • Java集合-HashMap1.8也会发生死循环

    在网上搜资料时候然后发现网上都说1 7版本的HashMap会发生死链也就是死循环 但是在HashMap中也会产生死循环 接下来直接看代码吧 代码 类名字我忘记改了这是我以前看park时候弄的但是这不重要 当你运行 public class
  • [第16课]统计:诸方差公式

    Start 观看可汗视频 本节课 可汗老师对原始方差公式进行推导 得出如下更简洁的公式 2 i
  • MapReduce基础知识(个人总结)

    声明 1 本文为我的个人复习总结 并非那种从零基础开始普及知识 内容详细全面 言辞官方的文章 2 由于是个人总结 所以用最精简的话语来写文章 3 若有错误不当之处 请指出 Writable类型 Java类型 Hadoop Writable类