Mapreduce实例(五):二次排序

2023-05-16

MR 实现 二次排序

  • 实现思路
  • 代码实现
    • 自定义key的代码:
    • 分区函数类代码
    • 分组函数类代码
    • Map代码:
    • Reduce代码:
    • 完整代码:

大家好,我是风云,欢迎大家关注我的博客 或者 公号【笑看风云路】,在未来的日子里我们一起来学习大数据相关的技术,一起努力奋斗,遇见更好的自己!

实现思路

在Map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReader的实现。本实验中使用的是TextInputFormat,他提供的RecordReader会将文本的字节偏移量作为key,这一行的文本作为value。这就是自定义Map的输入是<LongWritable, Text>的原因。然后调用自定义Map的map方法,将一个个<LongWritable, Text>键值对输入给Map的map方法。注意输出应该符合自定义Map中定义的输出<IntPair, IntWritable>。最终生成一个List<IntPair, IntWritable>。在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。 如果没有通过job.setSortComparatorClass设置key比较函数类,则可以使用key实现的compareTo方法进行排序。 在本实验中,就使用了IntPair实现的compareTo方法。

在Reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用job.setGroupingComparatorClass设置的分组函数类只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和它的value迭代器)。同样注意输入与输出的类型必须与自定义的Reducer中声明的一致。

代码实现

​ 二次排序:在mapreduce中,所有的key是需要被比较和排序的,并且是二次,先根据partitioner,再根据大小。而本例中也是要比较两次。先按照第一字段排序,然后在第一字段相同时按照第二字段排序。根据这一点,我们可以构造一个复合类IntPair,他有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。Java代码主要分为四部分:自定义key,自定义分区函数类,map部分,reduce部分。

自定义key的代码:

public static class IntPair implements WritableComparable<IntPair>  
  {  
    int first;  //第一个成员变量  
    int second;  //第二个成员变量  
  
    public void set(int left, int right)  
    {  
      first = left;  
      second = right;  
    }  
    public int getFirst()  
    {  
      return first;  
    }  
    public int getSecond()  
    {  
      return second;  
    }  
    @Override  
    //反序列化,从流中的二进制转换成IntPair  
    public void readFields(DataInput in) throws IOException  
    {  
      // TODO Auto-generated method stub  
      first = in.readInt();  
      second = in.readInt();  
    }  
    @Override  
    //序列化,将IntPair转化成使用流传送的二进制  
    public void write(DataOutput out) throws IOException  
    {  
      // TODO Auto-generated method stub  
      out.writeInt(first);  
      out.writeInt(second);  
    }  
    @Override  
    //key的比较  
    public int compareTo(IntPair o)  
    {  
      // TODO Auto-generated method stub  
      if (first != o.first)  
      {  
        return first < o.first ? 1 : -1;  
      }  
      else if (second != o.second)  
      {  
        return second < o.second ? -1 : 1;  
      }  
      else  
      {  
        return 0;  
      }  
    }  
    @Override  
    public int hashCode()  
    {  
      return first * 157 + second;  
    }  
    @Override  
    public boolean equals(Object right)  
    {  
      if (right == null)  
        return false;  
      if (this == right)  
        return true;  
      if (right instanceof IntPair)  
      {  
        IntPair r = (IntPair) right;  
        return r.first == first && r.second == second;  
      }  
      else  
      {  
        return false;  
      }  
    }  
  }

所有自定义的key应该实现接口WritableComparable,因为是可序列的并且可比较的,并重载方法。该类中包含以下几种方法:
1.反序列化,从流中的二进制转换成IntPair 方法为public void readFields(DataInput in) throws IOException
2.序列化,将IntPair转化成使用流传送的二进制 方法为public void write(DataOutput out)
3. key的比较 public int compareTo(IntPair o) 另外新定义的类应该重写的两个方法 public int hashCode() 和public boolean equals(Object right)

分区函数类代码

 public static class FirstPartitioner extends Partitioner<IntPair, IntWritable>  
  {  
    @Override  
    public int getPartition(IntPair key, IntWritable value,int numPartitions)  
    {  
      return Math.abs(key.getFirst() * 127) % numPartitions;  
    }  
  }  

对key进行分区,根据自定义key中first乘以127取绝对值再对numPartions取余来进行分区。这主要是为实现第一次排序

分组函数类代码

 public static class GroupingComparator extends WritableComparator  
  {  
    protected GroupingComparator()  
    {  
      super(IntPair.class, true);  
    }  
    @Override  
    //Compare two WritableComparables.  
    public int compare(WritableComparable w1, WritableComparable w2)  
    {  
      IntPair ip1 = (IntPair) w1;  
      IntPair ip2 = (IntPair) w2;  
      int l = ip1.getFirst();  
      int r = ip2.getFirst();  
      return l == r ? 0 : (l < r ? -1 : 1);  
    }  
  }

分组函数类。在reduce阶段,构造一个key对应的value迭代器的时候,只要first相同就属于同一个组,放在一个value迭代器。这是一个比较器,需要继承WritableComparator

Map代码:

  public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>  
  {  
    //自定义map  
    private final IntPair intkey = new IntPair();  
    private final IntWritable intvalue = new IntWritable();  
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException  
    {  
      String line = value.toString();  
      StringTokenizer tokenizer = new StringTokenizer(line);  
      int left = 0;  
      int right = 0;  
      if (tokenizer.hasMoreTokens())  
      {  
        left = Integer.parseInt(tokenizer.nextToken());  
        if (tokenizer.hasMoreTokens())  
          right = Integer.parseInt(tokenizer.nextToken());  
        intkey.set(right, left);  
        intvalue.set(left);  
        context.write(intkey, intvalue);  
      }  
    }  
  }

Reduce代码:

public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable>  
  {  
    private final Text left = new Text();  
    private static final Text SEPARATOR = new Text("------------------------------------------------");  
  
    public void reduce(IntPair key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException  
    {  
      context.write(SEPARATOR, null);  
      left.set(Integer.toString(key.getFirst()));  
      System.out.println(left);  
      for (IntWritable val : values)  
      {  
        context.write(left, val);  
        //System.out.println(val);  
      }  
    }  
  }

完整代码:

 package mapreduce;  
  import java.io.DataInput;  
  import java.io.DataOutput;  
  import java.io.IOException;  
  import java.util.StringTokenizer;  
  import org.apache.hadoop.conf.Configuration;  
  import org.apache.hadoop.fs.Path;  
  import org.apache.hadoop.io.IntWritable;  
  import org.apache.hadoop.io.LongWritable;  
  import org.apache.hadoop.io.Text;  
  import org.apache.hadoop.io.WritableComparable;  
  import org.apache.hadoop.io.WritableComparator;  
  import org.apache.hadoop.mapreduce.Job;  
  import org.apache.hadoop.mapreduce.Mapper;  
  import org.apache.hadoop.mapreduce.Partitioner;  
  import org.apache.hadoop.mapreduce.Reducer;  
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  public class SecondarySort  
  {  
  
    public static class IntPair implements WritableComparable<IntPair>  
    {  
      int first;  
      int second;  
  
      public void set(int left, int right)  
      {  
        first = left;  
        second = right;  
      }  
      public int getFirst()  
      {  
        return first;  
      }  
      public int getSecond()  
      {  
        return second;  
      }  
      @Override  
  
      public void readFields(DataInput in) throws IOException  
      {  
        // TODO Auto-generated method stub  
        first = in.readInt();  
        second = in.readInt();  
      }  
      @Override  
  
      public void write(DataOutput out) throws IOException  
      {  
        // TODO Auto-generated method stub  
        out.writeInt(first);  
        out.writeInt(second);  
      }  
      @Override  
  
      public int compareTo(IntPair o)  
      {  
        // TODO Auto-generated method stub  
        if (first != o.first)  
        {  
          return first < o.first ? 1 : -1;  
        }  
        else if (second != o.second)  
        {  
          return second < o.second ? -1 : 1;  
        }  
        else  
        {  
          return 0;  
        }  
      }  
      @Override  
      public int hashCode()  
      {  
        return first * 157 + second;  
      }  
      @Override  
      public boolean equals(Object right)  
      {  
        if (right == null)  
          return false;  
        if (this == right)  
          return true;  
        if (right instanceof IntPair)  
        {  
          IntPair r = (IntPair) right;  
          return r.first == first && r.second == second;  
        }  
        else  
        {  
          return false;  
        }  
      }  
    }  
  
    public static class FirstPartitioner extends Partitioner<IntPair, IntWritable>  
    {  
      @Override  
      public int getPartition(IntPair key, IntWritable value,int numPartitions)  
      {  
        return Math.abs(key.getFirst() * 127) % numPartitions;  
      }  
    }  
    public static class GroupingComparator extends WritableComparator  
    {  
      protected GroupingComparator()  
      {  
        super(IntPair.class, true);  
      }  
      @Override  
      //Compare two WritableComparables.  
      public int compare(WritableComparable w1, WritableComparable w2)  
      {  
        IntPair ip1 = (IntPair) w1;  
        IntPair ip2 = (IntPair) w2;  
        int l = ip1.getFirst();  
        int r = ip2.getFirst();  
        return l == r ? 0 : (l < r ? -1 : 1);  
      }  
    }  
    public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>  
    {  
      private final IntPair intkey = new IntPair();  
      private final IntWritable intvalue = new IntWritable();  
      public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException  
      {  
        String line = value.toString();  
        StringTokenizer tokenizer = new StringTokenizer(line);  
        int left = 0;  
        int right = 0;  
        if (tokenizer.hasMoreTokens())  
        {  
          left = Integer.parseInt(tokenizer.nextToken());  
          if (tokenizer.hasMoreTokens())  
            right = Integer.parseInt(tokenizer.nextToken());  
          intkey.set(right, left);  
          intvalue.set(left);  
          context.write(intkey, intvalue);  
        }  
      }  
    }  
  
    public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable>  
    {  
      private final Text left = new Text();  
      private static final Text SEPARATOR = new Text("------------------------------------------------");  
  
      public void reduce(IntPair key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException  
      {  
        context.write(SEPARATOR, null);  
        left.set(Integer.toString(key.getFirst()));  
        System.out.println(left);  
        for (IntWritable val : values)  
        {  
          context.write(left, val);  
          //System.out.println(val);  
        }  
      }  
    }  
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException  
    {  
  
      Configuration conf = new Configuration();  
      Job job = new Job(conf, "secondarysort");  
      job.setJarByClass(SecondarySort.class);  
      job.setMapperClass(Map.class);  
      job.setReducerClass(Reduce.class);  
      job.setPartitionerClass(FirstPartitioner.class);  
  
      job.setGroupingComparatorClass(GroupingComparator.class);  
      job.setMapOutputKeyClass(IntPair.class);  
  
      job.setMapOutputValueClass(IntWritable.class);  
  
      job.setOutputKeyClass(Text.class);  
  
      job.setOutputValueClass(IntWritable.class);  
  
      job.setInputFormatClass(TextInputFormat.class);  
  
      job.setOutputFormatClass(TextOutputFormat.class);  
      String[] otherArgs=new String[2];  
      otherArgs[0]="hdfs://localhost:9000/mymapreduce8/in/goods_visit2";  
      otherArgs[1]="hdfs://localhost:9000/mymapreduce8/out";  
  
      FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));  
  
      FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
  
      System.exit(job.waitForCompletion(true) ? 0 : 1);  
    }  
  }

-------------- end ----------------

微信公众号:扫描下方二维码或 搜索 笑看风云路 关注
笑看风云路

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Mapreduce实例(五):二次排序 的相关文章

  • security.UserGroupInformation:MR 的 PrivilegedgedActionException 错误

    每当我尝试执行映射缩减作业以写入 Hbase 表时 我都会在控制台中收到以下错误 我正在从用户帐户运行 MR 作业 错误 security UserGroupInformation PriviledgedActionException 为
  • couchdb 视图使用另一个视图?

    我对 couchdb 中的视图有疑问 目前 我有许多视图 例如 view A view B view Z 对于每个视图 它们包含相同范围的键但具有不同的值 IE view A key key 1 value 10 key key 2 val
  • 在mongo中执行优先级查询

    样本文件 name John age 35 address join month 3 的员工优先级为 1 地址包含字符串 Avenue 的员工优先级为 2 地址包含字符串 Street 的员工优先级为 3 地址包含字符串 Road 的员工优
  • 如何从mapreduce中的reducer输出中删除r-00000扩展

    我能够正确重命名我的减速器输出文件 但 r 00000 仍然存在 我在我的减速器类中使用了 MultipleOutputs 这是详细信息 不确定我缺少什么或我需要做什么额外的事情 public class MyReducer extends
  • MongoDB 从两个数组计算值、排序和限制

    我有一个存储浮点数组的 MongoDB 数据库 假设以下格式的文档集合 id 0 vals 0 8 0 2 0 5 有一个查询数组 例如 带有值 0 1 0 3 0 4 我想计算集合中所有元素的距离 例如 差异之和 对于给定的文档和查询 它
  • Log4j RollingFileAppender 未将映射器和减速器日志添加到文件中

    我们希望将应用程序日志打印到本地节点上的文件中 我们使用 Log4j 的 RollingFileAppender Our log4j properties文件如下 ODS LOG DIR var log appLogs ODS LOG IN
  • Spark 无法再执行作业。执行器创建目录失败

    我们已经有一个小型 Spark 集群运行了一个月 它已经成功执行了作业 或者让我为该集群启动一个 Spark shell 无论我向集群提交作业还是使用 shell 连接到集群 错误总是相同的 root SPARK HOME bin spar
  • 两个相等的组合键不会到达同一个减速器

    我正在使用 MapReduce 框架用 Java 制作 Hadoop 应用程序 我仅使用文本键和值进行输入和输出 在减少最终输出之前 我使用组合器进行额外的计算步骤 但我有一个问题 钥匙没有进入同一个减速器 我在组合器中创建并添加键 值对
  • 如何处理 YARN MapReduce 作业的容器故障?

    YARN 中如何处理软件 硬件故障 具体来说 如果容器发生故障 崩溃 会发生什么 容器和任务失败由节点管理器处理 当容器失败或死亡时 节点管理器会检测到失败事件并启动一个新容器来替换失败的容器并在新容器中重新启动任务执行 如果应用程序主机发
  • Hadoop YARN 作业陷入映射 0% 并减少 0%

    我正在尝试运行一个非常简单的作业来测试我的 hadoop 设置 所以我尝试使用 Word Count Example 它陷入了 0 所以我尝试了一些其他简单的作业 并且每个作业都陷入了困境 52191 0003 14 07 14 23 55
  • 如何在hadoop/map reduce中创建固定行数的输出文件?

    假设我们有 N 个具有不同行数的输入文件 我们需要生成输出文件 使得每个输出文件恰好有 K 行 最后一个输出文件可以有 是否可以使用单个 MR 作业来完成此操作 我们应该打开文件以便在reducer中显式写入 输出中的记录应该被打乱 tha
  • 将多个前缀行过滤器设置为扫描仪 hbase java

    我想创建一台扫描仪 它可以为我提供带有 2 个前缀过滤器的结果例如 我想要其键以字符串 x 开头或以字符串 y 开头的所有行 目前我知道只能使用一个前缀 方法如下 scan setRowPrefixFilter prefixFiltet 在
  • Python - Map/Reduce - 如何在使用 DISCO 计数单词示例中读取 JSON 特定字段

    我正在按照 DISCO 示例来计算文件中的单词数 将单词数作为 Map Reduce 作业 http discoproject org doc disco start tutorial html 我对此工作没有任何问题 但是我想尝试从包含
  • 遍历 ArrayWritable - NoSuchMethodException

    我刚刚开始使用 MapReduce 并且遇到了一个奇怪的错误 我无法通过 Google 回答该错误 我正在使用 ArrayWritable 制作一个基本程序 但是当我运行它时 在Reduce过程中出现以下错误 java lang Runti
  • Sqoop - 绑定到 YARN 队列

    因此 使用 MapReduce v2 您可以使用绑定到某些 YARN 队列来管理资源和优先级 基本上通过使用 hadoop jar xyz jar D mapreduce job queuename QUEUE1 input output
  • 如何在 MapReduce 作业中导入自定义模块?

    我有一个 MapReduce 作业定义在main py 它导入了lib模块来自lib py 我使用 Hadoop Streaming 将此作业提交到 Hadoop 集群 如下所示 hadoop jar usr lib hadoop mapr
  • 无法在 Hadoop Map-Reduce 作业中加载 OpenNLP 句子模型

    我正在尝试将 OpenNLP 集成到 Hadoop 上的 Map Reduce 作业中 从一些基本的句子分割开始 在地图函数中 运行以下代码 public AnalysisFile analyze String content InputS
  • MongoDB - 使用聚合框架或 MapReduce 来匹配文档中的字符串数组(配置文件匹配)

    我正在构建一个可以比作约会应用程序的应用程序 我有一些结构如下的文档 db profiles find pretty id 1 firstName John lastName Smith fieldValues favouriteColou
  • MongoDB 存储过程等效项

    我有一个包含商店列表的大型 CSV 文件 其中一个字段是邮政编码 我有一个名为 ZipCodes 的独立 MongoDB 数据库 它存储任何给定邮政编码的纬度和经度 在 SQL Server 中 我将执行一个名为 InsertStore 的
  • MongoDB/PyMongo:如何在 Map 函数中使用点表示法?

    我正在尝试计算每个邮政编码中找到的记录数 在我的 MongoDB 中 嵌入了邮政编码 使用点表示法 它位于 a res z a 代表地址 res 代表住宅 z 代表邮政编码 例如 这工作得很好 db NY count a res z 141

随机推荐