MapReduce之二次排序

2023-05-16

目录

应用场景

什么是二次排序

怎样实现二次排序       

示例代码


应用场景

假如输入文件内容如下:

a,1
z,3
b,2
a,100
a,3
b,1

要求经过MapReduce处理后,key升序排列,相同key的vaule也升序排列,如下:

a,1
a,3,
a,100
b,1
b,2
z,3

什么是二次排序

二次排序是指我们对key进行排序后,同时也需要对value中的某个字段进行排序。实现二次排序的关键在于将初始的key与待排序字段组合成自定义类型的数据类型,将其作为新的key,利用mapreduce自动对key进行排序的原理,完成二次排序。在上面的示例输入中,初始key为字母列,待排序字段为数字列。

怎样实现二次排序       

由初始key字段与待排序字段组成的自定义的数据类型需要实现WritableComparable接口,WritableComparable接口继承自Writable接口和Comparable接口。Writable接口主要是用来实现序列化和反序列化,Comparable是Java中用于比较的接口。

在自定义的数据类型中,需要将初始key和待排序字段分别定义为变量,添加构造方法和get()/set()方法,同时重写序列化和反序列的方法,注意方法中数据类型要一致,最后重写比较方法。

         将上面自定义好的数据类型作为map输出的key,value还是初始的value,并输出。示例内容中,经过此步处理后,Map输出内容为

((a,1),1)
((z,3),3)
((b,2),2)
((a,100),100)
((a,3),3)
((b,1),1)


​

Map输出之后在reduce之前,还需要对map端输出的值进行处理,如下:

1,分区(partition)。因为使用了组合key作为新的key,如果还用之前的默认分区方法,在存在多个reduce  时,会将数据分散开,不符合要求,所以要 需要使用自定义的分区,然后按照初始的key进行区分,这样才能使结果符合要求。

     自定义分区类需要继承Partitioner类,重写getPartition方法;在Job中通过setPartitionerClass设置使用自定义的分区类。

2,分组(group),需要使用自定义group来处理我们需要的key,按照组合key中第一个字段,即初始key进行分组,这样得到的数据就是有序而且全部的.

       自定义分组类需要实现原生的RawComparator接口,RawComparator是一个原生的优化接口类,它只是简单的提供了数据流中的简单数据比较方法,此接口并没有被多数的衍生类所实现,最常用的实现类为WritableComparator,多数情况下是作为实现Writable接口的类的内部类,提供序列化字节的比较。RawComparator有两个比较方法,一个是对象间的比较,一个是字节数组的比较。

示例代码

(1)     创建map类,使用自定义数据类型作为输出结果的key,并实现map方法 ,设置组合key的值写入到context中


public static class SecondarySortMapper extends

          Mapper<LongWritable,Text,PairWritable,IntWritable>{}

/**......省略....**/
String lineValue = value.toString();

String[] strs = lineValue.split(",") ;

PairWritable mapOutputKey = new PairWritable ();

mapOutputKey.set(strs[0], Integer.valueOf(strs[1]));

mapOutputValue.set(Integer.valueOf(strs[1]));
/**.....省略.....**/

context.write(mapOutputKey, mapOutputValue);

(2)     创建reduce类,使用自定义数据类型作为输入的key,在reduce方法中,将输入key的第一个字段值,即初始key作为输出结果的key,循环输入的列表,将完成排序的key/value输出到上下文中。


/**部分示例代码**/
public static class SecondarySortReducer extends

          Reducer<PairWritable,IntWritable,Text,IntWritable>{}

PairWritable  outputKey = new outputKey();

outputKey.set(key.getFirst());

     for(IntWritable value : values){

          context.write(outputKey, value);

     }

(3)     设置job类,注意map输出类型和reduce类型均为自定义数据类型

/**部分示例代码**/
Job job = Job.getInstance(configuration, this.getClass().getSimpleName());

job.setJarByClass(this.getClass());

job.setMapperClass(SecondarySortMapper.class);

job.setMapOutputKeyClass(PairWritable.class);

job.setMapOutputValueClass(IntWritable.class);

job.setReducerClass(SecondarySortReducer.class);

job.setOutputKeyClass(IntWritable.class);

job.setOutputValueClass(IntWritable.class);

/**设置分区类**/
job.setPartitionerClass(FirstPartitioner.class);

/**设置分组类**/
job.setGroupingComparatorClass(FirstGroupingComparator.class);

(4)     实现自定义数据类型PairWritable,实现WritableComparable接口,

public class PairWritable implements WritableComparable<PairWritable> {

     private String first;
     private int second;

     public PairWritable() { }

     public PairWritable(String first, int second) {
          this.set(first, second);
     }

     public void set(String first, int second) {
          this.setFirst(first);
          this.setSecond(second);
     }

     public String getFirst() {
          return first;
     }

     public void setFirst(String first) {
          this.first = first;
     }

/**Get和set方法都用到了Integer的最大值,这是一种保持数据同为正数或同为负数的常用方法,避免出现正数和负数进行比较的情况**/
     public int getSecond() {
          return second - Integer.MAX_VALUE;

     }

     public void setSecond(int second) {
          this.second = second + Integer.MAX_VALUE;
     }

/**序列化和反序列化的方法,注意数据类型要前后对应**/
     public void write(DataOutput out) throws IOException {
          out.writeUTF(first);
          out.writeInt(second);

     }

     public void readFields(DataInput in) throws IOException {
          this.first = in.readUTF();
          this.second = in.readInt();
     }

/**比较对象的大小,返回结果为int类型,先对第一个字段进行比较,如果相同,继续比较第二个字段**/
     public int compareTo(PairWritable o) {
          int comp =this.first.compareTo(o.getFirst()) ;
        /**如果不相等**/
          if(0 != comp){
               return comp ;
          }

       /**相等**/
          return Integer.valueOf(this.getSecond()).compareTo(Integer.valueOf(o.getSecond())) ;

     }
}

(5)     自定义分组类FirstPartitioner,如果第一个字段相同则分为一组

/**部分示例代码**/    
public class FirstPartitioner extends Partitioner<PairWritable,IntWritable> {

     @Override
     public int getPartition(PairWritable key, IntWritable value,
               int numPartitions) {

   /**使用哈希码进行分组**/
          return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
     }
}

(6)     自定义分组类FirstGroupingComparator,如果第一个字段相同则为一组
  

/**部分示例代码**/
 public class FirstGroupingComparator implements RawComparator<PairWritable> {

           /**比较对象值,我们需要的是对组合key中的第一个字段进行比较**/

     public int compare(PairWritable o1, PairWritable o2) {
          return o1.getFirst().compareTo(o2.getFirst());
     }

     /**比较字节数组,因为我们输出类型为int,占4个字节,所以用数组总长度l减去4,即为需要的字节长度**/
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
          return WritableComparator.compareBytes(b1, 0, l1 - 4, b2, 0, l2 - 4);
     }

}

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

MapReduce之二次排序 的相关文章

  • 如何从mapreduce中的reducer输出中删除r-00000扩展

    我能够正确重命名我的减速器输出文件 但 r 00000 仍然存在 我在我的减速器类中使用了 MultipleOutputs 这是详细信息 不确定我缺少什么或我需要做什么额外的事情 public class MyReducer extends
  • 流数据和 Hadoop? (不是 Hadoop 流)

    我想使用 MapReduce 方法分析连续的数据流 通过 HTTP 访问 因此我一直在研究 Apache Hadoop 不幸的是 Hadoop 似乎希望以固定大小的输入文件开始作业 而不是在新数据到达时将其传递给消费者 事实确实如此 还是我
  • Hadoop MapReduce 提供嵌套目录作为作业输入

    我正在从事一项处理嵌套目录结构的工作 其中包含多个级别的文件 one three four baz txt bleh txt foo txt two bar txt gaa txt 当我添加one 作为输入路径 不会处理任何文件 因为没有文
  • MongoDB 从两个数组计算值、排序和限制

    我有一个存储浮点数组的 MongoDB 数据库 假设以下格式的文档集合 id 0 vals 0 8 0 2 0 5 有一个查询数组 例如 带有值 0 1 0 3 0 4 我想计算集合中所有元素的距离 例如 差异之和 对于给定的文档和查询 它
  • 仅使用一个映射器的 Hadoop gzip 输入文件[重复]

    这个问题在这里已经有答案了 可能的重复 为什么 hadoop 不能分割一个大文本文件 然后使用 gzip 压缩分割的内容 https stackoverflow com questions 6511255 why cant hadoop s
  • 如何具体确定MRJob中每个map步骤的输入?

    我正在从事一项地图缩减工作 包含多个步骤 使用 mrjob 每个步骤都会接收上一步的输出 问题是我不想这样 我想要的是提取一些信息并在第二步中针对所有输入等使用它 可以使用 mrjob 来做到这一点吗 Note 因为我不想使用emr 这个问
  • 如何处理 YARN MapReduce 作业的容器故障?

    YARN 中如何处理软件 硬件故障 具体来说 如果容器发生故障 崩溃 会发生什么 容器和任务失败由节点管理器处理 当容器失败或死亡时 节点管理器会检测到失败事件并启动一个新容器来替换失败的容器并在新容器中重新启动任务执行 如果应用程序主机发
  • 映射减少计数示例

    我的问题是关于mapreduce programming in java 假设我有 WordCount java 示例 一个标准mapreduce program 我希望map函数收集一些信息 并返回形成如下的reduce函数map
  • mongodb 聚合随机化(shuffle)结果

    我正在浏览一堆 mongo 文档 但找不到洗牌或随机化结果内容的可能性 有没有 特别是对于聚合框架本身来说 实际上并没有任何本地方法 因为还没有可用的运算符来执行诸如生成随机数之类的操作 因此 无论您可能投射一个字段进行排序的任何匹配 都不
  • 将多个前缀行过滤器设置为扫描仪 hbase java

    我想创建一台扫描仪 它可以为我提供带有 2 个前缀过滤器的结果例如 我想要其键以字符串 x 开头或以字符串 y 开头的所有行 目前我知道只能使用一个前缀 方法如下 scan setRowPrefixFilter prefixFiltet 在
  • Spark scala - 按数组列分组[重复]

    这个问题在这里已经有答案了 我对 Spark Scala 很陌生 感谢你的帮助 我有一个数据框 val df Seq a a1 Array x1 x2 a b1 Array x1 a c1 Array x2 c c3 Array x2 a
  • hadoop中reducer的数量

    我正在学习hadoop 我发现减速器的数量非常令人困惑 1 reducer的数量与partition的数量相同 2 reducer 的数量是 0 95 或 1 75 乘以 节点数 每个节点的最大容器数 3 减速机数量设定为mapred re
  • 在映射器的单个输出上运行多个减速器

    我正在使用地图缩减实现左连接功能 左侧有大约 6 亿条记录 右侧有大约 2300 万条记录 在映射器中 我使用左连接条件中使用的列来创建键 并将键值输出从映射器传递到减速器 我遇到性能问题 因为两个表中的值数量都很高的映射器键很少 例如分别
  • Riak 在 MapReduce 查询中失败。使用哪种配置?

    我正在与 riak riak js 结合开发一个 nodejs 应用程序 并遇到以下问题 运行此请求 db mapreduce add logs run 正确返回存储在存储桶日志中的所有 155 000 个项目及其 ID logs 1GXt
  • 运行时异常:java.lang.NoSuchMethodException:tfidf$Reduce.()

    如何解决这个问题 tfidf是我的主类 为什么运行jar文件后会出现这个错误 java lang RuntimeException java lang NoSuchMethodException tfidf Reduce
  • 运行 Sqoop 导入和导出时如何找到最佳映射器数量?

    我正在使用 Sqoop 版本 1 4 2 和 Oracle 数据库 运行 Sqoop 命令时 例如这样 sqoop import fs
  • Java8:使用 Stream / Map-Reduce / Collector 将 HashMap 转换为 HashMap

    我知道如何 改造 一个简单的JavaList from Y gt Z i e List
  • MongoDB/PyMongo:如何在 Map 函数中使用点表示法?

    我正在尝试计算每个邮政编码中找到的记录数 在我的 MongoDB 中 嵌入了邮政编码 使用点表示法 它位于 a res z a 代表地址 res 代表住宅 z 代表邮政编码 例如 这工作得很好 db NY count a res z 141
  • 为什么 Spark 没有使用本地计算机上的所有核心

    当我在 Spark Shell 中或作为作业运行一些 Apache Spark 示例时 我无法在单台计算机上实现完全的核心利用率 例如 var textColumn sc textFile home someuser largefile t
  • 从机上的 DiskErrorException - Hadoop 多节点

    我正在尝试处理来自 hadoop 的 XML 文件 在对 XML 文件调用字数统计作业时出现以下错误 13 07 25 12 39 57 INFO mapred JobClient Task Id attempt 201307251234

随机推荐

  • 结构程序设计的经典定义

    结构程序设计的经典定义如下所述 xff1a 如果一个程序的 代码块仅仅通过顺序 选择和循环这3种基本控制结构 进行连接 xff0c 并且每个代码块只有一个入口和一个出口 xff0c 则称这个程序是结构化的 如果只允许使用顺序 IF THEN
  • 总体设计启发性规则7条

    nbsp 启发性规则 7条 1 改进软件结构提高模块独立性 通过模块分解或合并 降低耦 合提高内聚 2 模块规模应该适中 过大的模块往往是由于分解不充分 过小 的模块将导致模块数目过多将使系统接口复杂 3 深度 宽度 扇出和扇入都应适当 深
  • SqlServer调用webapi和webService接口

    1 通过http协议post调用webapi接口 xff08 json数据格式 xff09 declare 64 ServiceUrl as varchar 1000 set 64 ServiceUrl 61 39 http 127 0 0
  • C语言和C++的区别是什么?8个点通俗易懂的告诉你

    有些初学的同学傻傻分不清其中的区别 xff0c 下面我将详细的讲解C语言和C 43 43 的区别点 帮助大家尽快的理解 1 关键字 蓝色标注为C语言关键字 xff0c C 43 43 继承了C语言的所有关键字 xff0c 以下红色标注为C
  • money 最小花费(spfa)

    问题描述 在n个人中 xff0c 某些人的银行账号之间可以互相转账 这些人之间转账的手续费各不相同 给定这些人之间转账时需要从转账金额里扣除百分之几的手续费 xff0c 请问A最少需要多少钱使得转账后B收到100元 输入格式 第一行输入两个
  • cpu优化-cpu亲和性

    cpu亲和性 taskset命令可以将进程绑核 格式为taskset p c cpu list pid xff0c 其中cpu list是数字化的cpu列表 xff0c 从0开始 多个不连续的cpu可用逗号连接 xff0c 连续的可用 连接
  • Homebrew安装慢,总是失败如何解决

    前言 如果使用Homebrew xff08 https brew sh xff09 官网提供的命令来进行下载的 xff0c 是从github上进行下载 xff0c 没有翻墙工具的话会比较慢 xff0c 甚至可能会下载失败 xff0c 所以我
  • 开源飞控APM与PIXHAWK

    一 APM 官网地址 xff1a http ardupilot org APM xff08 ArduPilotMega xff09 是在2007年由DIY无人机社区 xff08 DIY Drones xff09 推出的飞控产品 xff0c
  • Pixhawk解锁常见错误

    第一次解锁 xff0c 接上MP看着HUD的提示 xff0c 即飞行数据的界面 xff1a 一般的不成功解锁有以下的原因 xff08 网络整理 xff09 xff08 1 xff09 HUD显示 RC not calibrated xff1
  • 学习MySQL——单表查询

    文章目录 一 SQL语言规范二 基本的SELECT语句1 列的别名2 去除重复行3 空值参与运算4 着重号 96 96 5 显示表结构 三 运算符 比较运算符1 等号运算符 xff08 61 xff09 2 不等于运算符 xff08 lt
  • FreeRTOS-启动第一个任务

    FreeRTOS开始第一个任务源码分析 vTaskStartScheduler xff1a 1 创建一个空任务 xff1a 优先级为0 2 是否使用软件定时器 是的话 创建软件定时器 3 关闭中断 xff08 关中断操作的寄存器是BASEP
  • 我的2014个人总结——学习篇、工作篇、生活篇

    2013的个人总结在我印象当中是写过的 xff0c 2014已成为过去 xff0c 当我想回过头来看看我2013年的总结时 xff0c 奈何我已不知它的踪迹了 xff0c 所以决定以后的个人总结还是以博客的形式记录吧 xff01 平静下来
  • ubuntu vnc 已经配置好,一键开启,节省大家时间

    1 粘贴复制 自己找个目录复制过去 2 修改权限 sudo chmod 777 x0vncserver 3 开启 x0vncserver rfbport 61 5900 SecurityTypes 61 None 下载地址 xff1a ht
  • ArcGIS Engine许可突然用不了了或者localhost没有有效的许可管理器

    ArcGIS Engine许可突然用不了了或者localhost没有有效的许可管理器 在Arc Engine安装包中再重新安装一次许可管理
  • ZeroMQ学习 (五)发布-订阅模式

    7 发布 订阅模式 发布者不用管是否有订阅者 xff0c 它只管不停的发布 xff0c 也不用接受客户端的请求 多订阅者可以尝试链接发布者 xff0c 来接受信息 xff0c 但是不能往发布者发送请求 发布者源码 xff1a span st
  • 树莓派3B+安装Ubuntu20.04

    ros2已经出到F版本了 xff0c 本来想下载一个尝鲜一下 xff0c 怒肝了两天终于把Ubuntu20 04装到树莓派里面了 但是 xff0c 忽然发现F版本的还未发行 xff0c 只是在网站上更新了安装方法 xff0c 安装包和公钥都
  • 【SpringBoot】SpringBoot+SpringSecurity+CAS实现单点登录

    文章目录 一 CAS的概述1 SSO2 CAS3 概念 二 CAS的流程三 CAS服务端部署1 下载地址2 源码打包3 部署运行4 java io FileNotFoundException etc cas thekeystore 系统找不
  • Google Scholar 谷歌学术文献检索技巧总结

    原文链接 xff1a https zhuanlan zhihu com p 24369927 身边有朋友想学如何使用谷歌学术 xff0c 为了更广泛的传播和重复查阅 xff0c 故将个人了解到的谷歌学术检索文献的技巧总结在此 当然 xff0
  • kubernetes flannel pod CrashLoopBackoff解决

    背景 某环境客户部署了一个kubernetes集群 xff0c 发现flannel的pod一直重启 xff0c 始终处于CrashLoopBackOff状态 排查 对于始终CrashLoopBackOff的pod xff0c 一般是应用本身
  • MapReduce之二次排序

    目录 应用场景 什么是二次排序 怎样实现二次排序 示例代码 应用场景 假如输入文件内容如下 xff1a a 1 z 3 b 2 a 100 a 3 b 1 要求经过MapReduce处理后 xff0c key升序排列 xff0c 相同key