自己完成一个简单的mapreduce程序

2023-05-16

hdfs上的数据源 search.txt,我们这里依然使用讲解hive案例的那个数据源,通过把hive执行的结果与我们自己写的mapreduce的结果作比较,来验证是否编码正确,以及能否正确理解mapreduce。数据源参考我之前的文章"hive实例--分析每个月的查询量",比较多,我就不粘贴了。

先定义StatisticsMapper类,继承并实现map方法即可:

package com.roadjava.hadooptest;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
 * map方法被map task调用,map task每读取一行文本来调用一次map方法 。
 * map方法调用完只会的输出类似如下格式(即一行一个1):
 * 2016-01 1
 * 2016-01 1
 * 2016-08 1
 */
public class StatisticsMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
	/**
	 * key:当前行相对于数据源开始位置的偏移量
	 * value:每一行数据
	 */
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
			throws IOException, InterruptedException {
		//value表示读取到的每一行原数据
        String line = value.toString();
        //切分value
        String[] lineArr = line.split("\\|");
    	String time = lineArr[0].trim().substring(0, 7);
    	//time类似2016-01
        context.write(new Text(time), new LongWritable(1));
	}
}

再定义StatisticsReduce类:

package com.roadjava.hadooptest;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
 *   * reduce方法提供给reduce task进程来调用 
 *   
     map--------> shuffle---------->reduce
 2016-01 1		2016-01 [1,1]
 2016-01 1		2016-08 [1,1,1]
 2016-08 1
 2016-08 1
 2016-08 1   
       
 */
public class StatisticsReduce extends 
	Reducer<Text, LongWritable, Text, LongWritable>{
	/**
	 *  key:对应mapper阶段输出的key类型 ,这里形如 2016-01
	 * v2s:对应shuffle阶段输出的value,形如 [1,1,1] 
	 */
	@Override
	protected void reduce(Text key, Iterable<LongWritable> v2s,
			Reducer<Text, LongWritable, Text, LongWritable>.Context context) 
					throws IOException, InterruptedException {
		//定义一个total用来统计当月有几个不同的查询内容,有几个即为当月查询次数  
        long total=0;
        //v2s当中存储 [1,1,1]  key:2016-01
       for(LongWritable next:v2s){
           total+=next.get();     
       }
       //输出<K3、V3>,比如<"hello", 5>
       context.write(key, new LongWritable(total));
	} 
}

从我注释的内容可以看到,mapreduce实际上有是三个过程:map--------> shuffle---------->reduce,那你会问了,为什么没见你重写shuffle方法呢?因为shuffle不是我们开发可以控制的,所谓mapreduce开发,其实最重要的工作就是重写map方法和reduce方法。因此别看例子小,流程走通才是最重要的。shuffle阶段主要负责处理map阶段的输出,针对map阶段的输出会做如下处理:

一、key相同的合并,怎么合并法呢?key自然不变,把一个个的value放到一个集合里面,在reduce阶段,你就可以使用这个集合迭代出这个key的所有value了。

二、排序,比方说按key从小到大排序

最后,配置一下mapreduce就行了。

package com.roadjava.hadooptest;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Statistics { 
	public static void main(String[] args) throws Exception {
        //把MapReduce作业抽象成Job对象了并提交
        Job job = Job.getInstance(new Configuration());
        //设置main方法所在的类 
        job.setJarByClass(Statistics.class);

        //接下来我们设置一下Job的Mapper相关属性
        job.setMapperClass(StatisticsMapper.class);//设置Mapper类
        job.setMapOutputKeyClass(Text.class);//设置K2的类型
        job.setMapOutputValueClass(LongWritable.class);//设置V2的类型
        //接下来我们得告诉程序我们应该去哪里读取文件。需要注意的是args[0]的值是在HDFS文件系统上的路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));

        //接下来我们来设置一下Job的Reducer相关属性
        job.setReducerClass(StatisticsReduce.class);//设置Reducer类
        job.setOutputKeyClass(Text.class);//设置K3的类型
        job.setOutputValueClass(LongWritable.class);//设置V3的类型
        //设置输出结果放在hdfs文件系统的哪个路径
       FileOutputFormat.setOutputPath(job, new Path(args[1])); 
     //把作业提交并且等待执行完成,参数为true的话,会打印进度和详情。
       job.waitForCompletion(true);
 }
}

导出这三个类所在的项目为statistics.jar,并上传到hadoop集群所在机器上,执行如下命令,以下命令用于运行你自己写的这个mapreduce程序:

[root@node113 ~]#hadoop  jar  statistics.jar  com.roadjava.hadooptest.Statistics  
/user/hive/warehouse/test111.db/t_searchword/search.txt  /output201815

查看运行结果:

image.png

与使用hive执行hql语句得到的结果一致,说明编写正确。

上面的例子中,Mapper的四个泛型参数是Mapper<LongWritable, Text, Text, LongWritable>,可能你在看到那篇教程之前,自己看过了wordcount这个经典的mapreduce程序案例,或许你有些许迷惑、又或许你已经形成了思维的定势,即Mapper的泛型参数只能是这个格式:LongWritable, Text, Text, LongWritable,mapreduce也只能用来统计单词出现个数或者每个月的查询量,那就大错特错了。我现在就稍微改动一下程序,希望能打破你这种思维定势。

StatisticsMapper2:

package com.roadjava.hadooptest2;

 

import java.io.IOException;

 

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

/*

 * map方法被map task调用,map task每读取一行文本来调用一次map方法 。

 * map方法调用完只会的输出类似如下格式(即一行一个1):

 * 2016-01 tomcat

 * 2016-01 cms

 * 2016-08 iframe

 * 

 * Mapper<K1,V1,K2,V2>,这里我把V2改为了Text类型,表示map执行完之后输出类型是Text,

 * 而不再是LongWritable

 */

public class StatisticsMapper2 extends 

Mapper<LongWritable, Text, Text, Text>{

/**

* key:当前行相对于数据源开始位置的偏移量

* value:每一行数据

*/

@Override

protected void map(LongWritable key, Text value, 

Mapper<LongWritable, Text, Text, Text>.Context context)

throws IOException, InterruptedException {

//value表示读取到的每一行原数据

        String line = value.toString();

        //切分value

        String[] lineArr = line.split("\\|");

    String time = lineArr[0].trim().substring(0, 7);

    String cont=lineArr[1].trim();

    //2016-01  tomcat

        context.write(new Text(time), new Text(cont));

}

}

StatisticsReduce2:

package com.roadjava.hadooptest2;

 

import java.io.File;

import java.io.IOException;

 

import org.apache.commons.io.FileUtils;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

/*

 *   * reduce方法提供给reduce task进程来调用 ,map过后执行shuffle

 ,shuffle会将相同的key对应的值排序后堆叠在一起,堆叠的结果就是形成了reduce的

 第二个参数iterable,针对经过shuffle之后的输出结果,每一组相同的key都会调用一次reduce

 (比如"2016-01 [tomcat,cms]"与“2016-08 [iframe,java内存溢出,慢]”分别调用

 2次reduce函数)

     map--------> shuffle---------->reduce

 2016-01 12016-01 [tomcat,cms]

 2016-01 12016-08 [iframe,java内存溢出,慢]

 2016-08 1

 2016-08 1

 2016-08 1   

   Reducer<K2, V2, K3, V3>:K2指shuffle的输出key

   v2:shuffle输出的集合

   k3:reduce阶段的输出key

   v3:reduce阶段的输出value    

 */

public class StatisticsReduce2 extends 

Reducer<Text, Text, Text, LongWritable>{

/**

*  key:对应mapper阶段输出的key类型 ,这里形如 2016-01

* v2s:对应shuffle阶段输出的value,形如 [1,1,1] 

*/

@Override

protected void reduce(Text key, Iterable<Text> iterable,

Reducer<Text, Text, Text, LongWritable>.Context context) 

throws IOException, InterruptedException {

//定义一个total用来统计当月有几个不同的查询内容,有几个即为当月查询次数  

        long total=0;

        //iterable当中存储 [tomcat,cms]  key:2016-01

       for(Text next:iterable){

       FileUtils.writeStringToFile(new File("/home/test"), 

       next.toString(), true);

           total++;     

       }

       FileUtils.writeStringToFile(new File("/home/test"), "\r\n", true);

       //输出<K3,V3>,比如  "2016-01"----2

       context.write(key, new LongWritable(total));

}

Statistics2:

package com.roadjava.hadooptest2;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class Statistics2 { 

public static void main(String[] args) throws Exception {

        //把MapReduce作业抽象成Job对象了并提交

        Job job = Job.getInstance(new Configuration());

        //设置main方法所在的类 

        job.setJarByClass(Statistics2.class);

 

        //接下来我们设置一下Job的Mapper相关属性

        job.setMapperClass(StatisticsMapper2.class);//设置Mapper类

        job.setMapOutputKeyClass(Text.class);//设置K2的类型

        job.setMapOutputValueClass(Text.class);//设置V2的类型

        //接下来我们得告诉程序我们应该去哪里读取文件。需要注意的是args[0]的值是在HDFS文件系统上的路径

        FileInputFormat.setInputPaths(job, new Path(args[0]));

 

        //接下来我们来设置一下Job的Reducer相关属性

        job.setReducerClass(StatisticsReduce2.class);//设置Reducer类

        job.setOutputKeyClass(Text.class);//设置K3的类型

        job.setOutputValueClass(LongWritable.class);//设置V3的类型

        //设置输出结果放在hdfs文件系统的哪个路径

       FileOutputFormat.setOutputPath(job, new Path(args[1])); 

     //把作业提交并且等待执行完成,参数为true的话,会打印进度和详情。

       job.waitForCompletion(true);

 }

}

同样的方法导出为statistics2.jar

image.png

并上传到服务器,然后运行它:

hadoop  jar  statistics2.jar  com.roadjava.hadooptest2.Statistics2  /user/hive/warehouse/test111.db/t_searchword/search.txt  /output201817

查看运行结果:hdfs dfs -cat /output201817/part-r-00000,如下图:

image.png

结果是一样的。我们的mapreduce程序中生成的测试文件/home/test:

[root@node112 /home]#cat -n /home/test

     1tomcatcms

     2iframejava内存溢出慢

     3cmsjavawebvalidateJAVA_OPTS

     4ssh根目录tomcat

     5jdksvn5.0环境tomcattarhrefjava

     6redisadfsd自动完成jpssvnadfdsdfJavaa安装tomcatbootstrap tableCSS图片轮播js

可以看到每一行都是这个月所有的搜索词,正好验证了我们开头说的shuffle所作的工作。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

自己完成一个简单的mapreduce程序 的相关文章

  • Spark 在 Hbase 的 InputSplit 期间给出空指针异常

    我正在使用 Spark 1 2 1 Hbase 0 98 10 和 Hadoop 2 6 0 从 hbase 检索数据时出现空点异常 找到下面的堆栈跟踪 sparkDriver akka actor default dispatcher 2
  • 如何用hadoop实现自连接/叉积?

    对成对的项目进行评估是常见的任务 示例 重复数据删除 协同过滤 相似项目等 这基本上是具有相同数据源的自连接或叉积 要进行自连接 您可以遵循 减少端连接 模式 映射器将连接 外键作为键发出 将记录作为值发出 因此 假设我们想要对以下数据的
  • Hadoop MapReduce:可以在一个 hadoop 作业类中定义两个映射器和缩减器吗?

    我有两个独立的 java 类 用于执行两个不同的 MapReduce 作业 我可以独立运行它们 对于这两个作业 它们所操作的输入文件是相同的 所以我的问题是是否可以在一个java类中定义两个映射器和两个缩减器 例如 mapper1 clas
  • 为什么 Hadoop 中正确的缩减数量是 0.95 或 1.75?

    hadoop 文档指出 正确的归约次数似乎是 0 95 或 1 75 乘以 mapred tasktracker reduce tasks maximum 0 95 所有的减少都可以立即启动并开始 地图完成时传输地图输出 用1 75更快 节
  • mrjob组合器不工作python

    简单的映射组合reduce程序 映射column 1与值column 3并追加 在相同键和附加的每个映射器输出中 减少相同密钥的输出后 input 1 and input 2两个文件都包含 a 1 2 3 a 4 5 6 Code is f
  • 使用 MultipleOutputs 写入 MapReduce 中的 HBase

    我目前有一个 MapReduce 作业 它使用 MultipleOutputs 将数据发送到多个 HDFS 位置 完成后 我使用 HBase 客户端调用 在 MR 之外 将一些相同的元素添加到一些 HBase 表中 使用 TableOutp
  • CouchDB“加入”两个文档

    我有两个看起来有点像这样的文档 Doc id AAA creator id data DataKey id credits left 500 times used 0 data id AAA 我想要做的是创建一个视图 它允许我传递 Data
  • 如何在hadoop/map reduce中创建固定行数的输出文件?

    假设我们有 N 个具有不同行数的输入文件 我们需要生成输出文件 使得每个输出文件恰好有 K 行 最后一个输出文件可以有 是否可以使用单个 MR 作业来完成此操作 我们应该打开文件以便在reducer中显式写入 输出中的记录应该被打乱 tha
  • java.io.IOException:无法获取 LocationBlock 的块长度

    我正在使用 HDP 2 1 对于集群 我遇到了以下异常 并且 MapReduce 作业因此失败 实际上 我们定期使用 Flume 版本的数据创建表 1 4 我检查了映射器尝试读取的数据文件 但我找不到任何内容 2014 11 28 00 0
  • 为什么组合器输入记录的数量比映射的输出数量多?

    Combiner 在 Mapper 之后 Reducer 之前运行 它将接收给定节点上的 Mapper 实例发出的所有数据作为输入 然后它将输出发送到Reducers 因此组合器输入的记录应小于映射输出的记录 12 08 29 13 38
  • Riak 在 MapReduce 查询中失败。使用哪种配置?

    我正在与 riak riak js 结合开发一个 nodejs 应用程序 并遇到以下问题 运行此请求 db mapreduce add logs run 正确返回存储在存储桶日志中的所有 155 000 个项目及其 ID logs 1GXt
  • 如何在 MapReduce 作业中导入自定义模块?

    我有一个 MapReduce 作业定义在main py 它导入了lib模块来自lib py 我使用 Hadoop Streaming 将此作业提交到 Hadoop 集群 如下所示 hadoop jar usr lib hadoop mapr
  • MongoDB 存储过程等效项

    我有一个包含商店列表的大型 CSV 文件 其中一个字段是邮政编码 我有一个名为 ZipCodes 的独立 MongoDB 数据库 它存储任何给定邮政编码的纬度和经度 在 SQL Server 中 我将执行一个名为 InsertStore 的
  • 为什么 CouchDB 归约函数接收“键”作为参数

    使用 CouchDB 减少功能 function keys values rereduce 这被称为这样 reduce key1 id1 key2 id2 key3 id3 value1 value2 value3 false 问题1 将键
  • 是否可以通过编写单独的mapreduce程序并行执行Hive查询?

    我问了一些关于提高 Hive 查询性能的问题 一些答案与映射器和减速器的数量有关 我尝试使用多个映射器和减速器 但在执行中没有看到任何差异 不知道为什么 可能是我没有以正确的方式做 或者我错过了其他东西 我想知道是否可以并行执行 Hive
  • 为什么在我的例子中 For 循环比 Map、Reduce 和 List 理解更快

    我编写了一个简单的脚本来测试速度 这就是我发现的结果 实际上 for 循环在我的例子中是最快的 这真的让我感到惊讶 请查看下面 正在计算平方和 这是因为它在内存中保存列表还是有意为之 谁能解释一下这一点 from functools imp
  • Protobuf RPC 在 Hadoop 2.2.0 单节点服务器上不可用?

    我正在尝试在按照本教程安装的本地单节点集群上运行 hadoop 2 2 0 mapreduce 作业 http codesfusion blogspot co at 2013 10 setup hadoop 2x 220 on ubuntu
  • Hadoop - 直接从 Mapper 写入 HBase

    我有一个 hadoop 作业 其输出应写入 HBase 我并不真正需要减速器 我想要插入的行类型是在映射器中确定的 如何使用 TableOutputFormat 来实现此目的 从所有示例中 我看到的假设是 reducer 是创建 Put 的
  • 为什么 Spark 没有使用本地计算机上的所有核心

    当我在 Spark Shell 中或作为作业运行一些 Apache Spark 示例时 我无法在单台计算机上实现完全的核心利用率 例如 var textColumn sc textFile home someuser largefile t
  • 为什么 Spark 比 Hadoop MapReduce 更快

    有人可以使用字数统计示例解释一下为什么 Spark 比 MapReduce 更快吗 bafna的答案提供了故事的记忆方面 但我想补充另外两个重要事实 DAG和生态系统 Spark 使用 惰性求值 来形成连续计算阶段的有向无环图 DAG 通过

随机推荐

  • 作为一个4年程序员至少需要掌握的专业技能

    一名3年工作经验的程序员应该具备的技能 xff0c 在机缘巧合之中 xff0c 看了这篇博客 感觉自己真的是很差 xff0c 一直想着会写if else 就已经是一名程序员了 xff0c 在工作之余也很少学习 于是 xff0c 自己的cod
  • C语言与C++的区别

    一 C 43 43 简介 本贾尼 斯特劳斯特鲁普 于1979年4月在贝尔实验室负责分析UNIX系统的内核的流量情况 于1979年10月开始着手开发一种新的编程语言 在C语言的基础上增加了面向对象机制 这就是C 43 43 的来历 在1983
  • 我的2011-当梦想照进现实

    我的2011年 xff0c 之所以是现在的样子 xff0c 始缘于我三年前的一个决定 离职考研 对于工作了两年的我来说 xff0c 离职考研是人生的一场博弈 我的2011年 xff0c 结束了研究生期间对三维骨骼动画渲染的相关研究 xff0
  • Dockerfile RUN 同时执行多条命令

    Dockerfile RUN 同时执行多条命令 Dokcerfile中的命令每执行一条即产生一个新的镜像 xff0c 当前命令总是在最新的镜像上执行 如下Dockerfile xff1a RUN span class hljs built
  • HC-SR04超声波模块使用记录

    文章目录 HC SR04超声波模块使用记录轮询测量方式一 模块使用中的问题二 应对方法三 注意 分时测量利用输入捕获测量利用输入捕获测量 HC SR04超声波模块使用记录 具体使用方法见HC SR04使用手册 xff0c 本文重点记录该模块
  • 【C语言冒泡排序、选择排序和快速排序】

    文章目录 前言一 冒泡排序二 选择排序三 快速排序四 代码设计与实现代码设计代码实现 调试结果冒泡排序改良 延伸思考总结 前言 本文简单介绍了C语言的冒泡排序 选择排序 快速排序 xff0c 结合本人的理解与使用做一下记录 一 冒泡排序 思
  • 平衡车制作---原理篇

    平衡车制作 原理篇 文章目录 平衡车制作 原理篇前言直立控制直观感受内部机理 速度控制方向控制总结 前言 本篇教程内容主要来自于 直立平衡车模参考设计方案 xff0c 且这里是从概念层面讲述的并没有具体的控制理论方面的内容 有了这些概念方面
  • FreeRTOS使用注意

    FreeRTOS使用注意 xff1a 中断中必须使用带FromISR结尾的API函数只有中断优先级处于FreeRTOS可管理的范围内时 xff0c 才能使用FreeRTOS提供的API函数中断中不要使用FreeRTOS提供的内存申请和释放函
  • 现代控制理论基础总结

    现代控制理论基础总结 xff08 线性部分 xff09 学习现代控制理论也有两个月的时间了 xff0c 里面涉及的基础内容和公式十分之多 xff0c 所以现在对各部分基础知识作一个总结 1 控制系统的状态表达式 在现代控制理论中 xff0c
  • 题库(关于c++的网站都盘了)大盘点(好多没盘到)

    1 keda ac 2 hydro ac 3 luogu com cn 4 cplusplus com 5 leetcode cn 6 https loj ac 7 noi cn 8 ybt ssoier cn 8088 9 learncp
  • 利用MapReduce进行二次排序--附例子

    首先先来明确几个概念 xff1a 1 分区 partition 1 xff09 分区 xff08 partition xff09 xff1a 默认采取散列值进行分区 xff0c 但此方法容易造成 数据倾斜 xff08 大部分数据分到同一个r
  • MapReduce之单表关联Join输出祖父母、孙子---(附例子)

    需求 xff1a 一个文件 xff0c 有子女和对应的父母 xff0c 要求输出 祖父母 孙子 xff0c 文件如下 xff1a 单表关联 结果 xff1a child parent grand child Tom Lucy Alice T
  • 如何把 ubuntu 16.04.7 命令行界面下的系统语言更改为中文?

    如果你的 ubuntu 16 04 7 系统在命令行下的默认语言是英文 xff0c 比如下面这样 xff1a 怎么更改才能让某些输出单词显示成中文呢 xff1f 可以修改 etc default locale 这个文件 xff0c 先看一下
  • 小程序云开发实现订阅消息

    链接 简书博主示例 xff1a https www jianshu com p d90f22dac001 官方文档 xff1a 官方文档1 文档2 云调用 使用方法demo 假如这是一个点餐系统 xff0c 想让顾客下单以后 xff0c 派
  • ue4 常见问题解答

    1 如何让客户端自动连接服务器 span style color 0000aa MyGame span span style color 000066 span span style color 000066 exe span span s
  • Freertos代码之互斥信号量

    信号量用于限制对共享资源的访问和多任务之间的同步 三个信号量API函数都是宏 xff0c 使用现有的队列实现 使用例子 typedef void QueueHandle t typedef QueueHandle t SemaphoreHa
  • C语言冒泡排序和快速排序的思想和实现

    冒泡排序 基本思想 对有n个记录的序列进行冒泡排序 xff0c 首先将第一个数字与第二个数字进行比较 xff0c 若为逆序 xff0c 则将两个数字的顺序交换 然后比较第二个数字与第三个数字 xff0c 若为逆序 xff0c 则将两个数字的
  • CVPR2023最新论文 (含语义分割、扩散模型、多模态、预训练、MAE等方向)

    CVPR2023论文最新速递 xff01 含分割 VIT 点云等多个方向 2023 年 2 月 28 日凌晨 xff0c CVPR 2023 顶会论文接收结果出炉 xff01 CVPR 2023 收录的工作中 34 扩散模型 多模态 预训练
  • 【Hadoop基础教程】6、Hadoop之单表关联查询

    本blog主要通过输入文件中的child字段和parent字段进行单表关联查询 xff0c 推导出哪些用户具有child与grandparent关系 开发环境 硬件环境 xff1a Centos 6 5 服务器4台 xff08 一台为Mas
  • 自己完成一个简单的mapreduce程序

    hdfs上的数据源 search txt 我们这里依然使用讲解hive案例的那个数据源 xff0c 通过把hive执行的结果与我们自己写的mapreduce的结果作比较 xff0c 来验证是否编码正确 xff0c 以及能否正确理解mapre