hdfs上的数据源 search.txt,我们这里依然使用讲解hive案例的那个数据源,通过把hive执行的结果与我们自己写的mapreduce的结果作比较,来验证是否编码正确,以及能否正确理解mapreduce。数据源参考我之前的文章"hive实例--分析每个月的查询量",比较多,我就不粘贴了。
先定义StatisticsMapper类,继承并实现map方法即可:
package com.roadjava.hadooptest;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
* map方法被map task调用,map task每读取一行文本来调用一次map方法 。
* map方法调用完只会的输出类似如下格式(即一行一个1):
* 2016-01 1
* 2016-01 1
* 2016-08 1
*/
public class StatisticsMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
/**
* key:当前行相对于数据源开始位置的偏移量
* value:每一行数据
*/
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
//value表示读取到的每一行原数据
String line = value.toString();
//切分value
String[] lineArr = line.split("\\|");
String time = lineArr[0].trim().substring(0, 7);
//time类似2016-01
context.write(new Text(time), new LongWritable(1));
}
}
再定义StatisticsReduce类:
package com.roadjava.hadooptest;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
* * reduce方法提供给reduce task进程来调用
*
map--------> shuffle---------->reduce
2016-01 1 2016-01 [1,1]
2016-01 1 2016-08 [1,1,1]
2016-08 1
2016-08 1
2016-08 1
*/
public class StatisticsReduce extends
Reducer<Text, LongWritable, Text, LongWritable>{
/**
* key:对应mapper阶段输出的key类型 ,这里形如 2016-01
* v2s:对应shuffle阶段输出的value,形如 [1,1,1]
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> v2s,
Reducer<Text, LongWritable, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
//定义一个total用来统计当月有几个不同的查询内容,有几个即为当月查询次数
long total=0;
//v2s当中存储 [1,1,1] key:2016-01
for(LongWritable next:v2s){
total+=next.get();
}
//输出<K3、V3>,比如<"hello", 5>
context.write(key, new LongWritable(total));
}
}
从我注释的内容可以看到,mapreduce实际上有是三个过程:map--------> shuffle---------->reduce,那你会问了,为什么没见你重写shuffle方法呢?因为shuffle不是我们开发可以控制的,所谓mapreduce开发,其实最重要的工作就是重写map方法和reduce方法。因此别看例子小,流程走通才是最重要的。shuffle阶段主要负责处理map阶段的输出,针对map阶段的输出会做如下处理:
一、key相同的合并,怎么合并法呢?key自然不变,把一个个的value放到一个集合里面,在reduce阶段,你就可以使用这个集合迭代出这个key的所有value了。
二、排序,比方说按key从小到大排序
最后,配置一下mapreduce就行了。
package com.roadjava.hadooptest;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Statistics {
public static void main(String[] args) throws Exception {
//把MapReduce作业抽象成Job对象了并提交
Job job = Job.getInstance(new Configuration());
//设置main方法所在的类
job.setJarByClass(Statistics.class);
//接下来我们设置一下Job的Mapper相关属性
job.setMapperClass(StatisticsMapper.class);//设置Mapper类
job.setMapOutputKeyClass(Text.class);//设置K2的类型
job.setMapOutputValueClass(LongWritable.class);//设置V2的类型
//接下来我们得告诉程序我们应该去哪里读取文件。需要注意的是args[0]的值是在HDFS文件系统上的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//接下来我们来设置一下Job的Reducer相关属性
job.setReducerClass(StatisticsReduce.class);//设置Reducer类
job.setOutputKeyClass(Text.class);//设置K3的类型
job.setOutputValueClass(LongWritable.class);//设置V3的类型
//设置输出结果放在hdfs文件系统的哪个路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//把作业提交并且等待执行完成,参数为true的话,会打印进度和详情。
job.waitForCompletion(true);
}
}
导出这三个类所在的项目为statistics.jar,并上传到hadoop集群所在机器上,执行如下命令,以下命令用于运行你自己写的这个mapreduce程序:
[root@node113 ~]#hadoop jar statistics.jar com.roadjava.hadooptest.Statistics
/user/hive/warehouse/test111.db/t_searchword/search.txt /output201815
查看运行结果:
与使用hive执行hql语句得到的结果一致,说明编写正确。
上面的例子中,Mapper的四个泛型参数是Mapper<LongWritable, Text, Text, LongWritable>,可能你在看到那篇教程之前,自己看过了wordcount这个经典的mapreduce程序案例,或许你有些许迷惑、又或许你已经形成了思维的定势,即Mapper的泛型参数只能是这个格式:LongWritable, Text, Text, LongWritable,mapreduce也只能用来统计单词出现个数或者每个月的查询量,那就大错特错了。我现在就稍微改动一下程序,希望能打破你这种思维定势。
StatisticsMapper2:
package com.roadjava.hadooptest2;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
* map方法被map task调用,map task每读取一行文本来调用一次map方法 。
* map方法调用完只会的输出类似如下格式(即一行一个1):
* 2016-01 tomcat
* 2016-01 cms
* 2016-08 iframe
*
* Mapper<K1,V1,K2,V2>,这里我把V2改为了Text类型,表示map执行完之后输出类型是Text,
* 而不再是LongWritable
*/
public class StatisticsMapper2 extends
Mapper<LongWritable, Text, Text, Text>{
/**
* key:当前行相对于数据源开始位置的偏移量
* value:每一行数据
*/
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
//value表示读取到的每一行原数据
String line = value.toString();
//切分value
String[] lineArr = line.split("\\|");
String time = lineArr[0].trim().substring(0, 7);
String cont=lineArr[1].trim();
//2016-01 tomcat
context.write(new Text(time), new Text(cont));
}
}
StatisticsReduce2:
package com.roadjava.hadooptest2;
import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
* * reduce方法提供给reduce task进程来调用 ,map过后执行shuffle
,shuffle会将相同的key对应的值排序后堆叠在一起,堆叠的结果就是形成了reduce的
第二个参数iterable,针对经过shuffle之后的输出结果,每一组相同的key都会调用一次reduce
(比如"2016-01 [tomcat,cms]"与“2016-08 [iframe,java内存溢出,慢]”分别调用
2次reduce函数)
map--------> shuffle---------->reduce
2016-01 12016-01 [tomcat,cms]
2016-01 12016-08 [iframe,java内存溢出,慢]
2016-08 1
2016-08 1
2016-08 1
Reducer<K2, V2, K3, V3>:K2指shuffle的输出key
v2:shuffle输出的集合
k3:reduce阶段的输出key
v3:reduce阶段的输出value
*/
public class StatisticsReduce2 extends
Reducer<Text, Text, Text, LongWritable>{
/**
* key:对应mapper阶段输出的key类型 ,这里形如 2016-01
* v2s:对应shuffle阶段输出的value,形如 [1,1,1]
*/
@Override
protected void reduce(Text key, Iterable<Text> iterable,
Reducer<Text, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
//定义一个total用来统计当月有几个不同的查询内容,有几个即为当月查询次数
long total=0;
//iterable当中存储 [tomcat,cms] key:2016-01
for(Text next:iterable){
FileUtils.writeStringToFile(new File("/home/test"),
next.toString(), true);
total++;
}
FileUtils.writeStringToFile(new File("/home/test"), "\r\n", true);
//输出<K3,V3>,比如 "2016-01"----2
context.write(key, new LongWritable(total));
}
}
Statistics2:
package com.roadjava.hadooptest2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Statistics2 {
public static void main(String[] args) throws Exception {
//把MapReduce作业抽象成Job对象了并提交
Job job = Job.getInstance(new Configuration());
//设置main方法所在的类
job.setJarByClass(Statistics2.class);
//接下来我们设置一下Job的Mapper相关属性
job.setMapperClass(StatisticsMapper2.class);//设置Mapper类
job.setMapOutputKeyClass(Text.class);//设置K2的类型
job.setMapOutputValueClass(Text.class);//设置V2的类型
//接下来我们得告诉程序我们应该去哪里读取文件。需要注意的是args[0]的值是在HDFS文件系统上的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//接下来我们来设置一下Job的Reducer相关属性
job.setReducerClass(StatisticsReduce2.class);//设置Reducer类
job.setOutputKeyClass(Text.class);//设置K3的类型
job.setOutputValueClass(LongWritable.class);//设置V3的类型
//设置输出结果放在hdfs文件系统的哪个路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//把作业提交并且等待执行完成,参数为true的话,会打印进度和详情。
job.waitForCompletion(true);
}
}
同样的方法导出为statistics2.jar
并上传到服务器,然后运行它:
hadoop jar statistics2.jar com.roadjava.hadooptest2.Statistics2 /user/hive/warehouse/test111.db/t_searchword/search.txt /output201817
查看运行结果:hdfs dfs -cat /output201817/part-r-00000,如下图:
结果是一样的。我们的mapreduce程序中生成的测试文件/home/test:
[root@node112 /home]#cat -n /home/test
1tomcatcms
2iframejava内存溢出慢
3cmsjavawebvalidateJAVA_OPTS
4ssh根目录tomcat
5jdksvn5.0环境tomcattarhrefjava
6redisadfsd自动完成jpssvnadfdsdfJavaa安装tomcatbootstrap tableCSS图片轮播js
可以看到每一行都是这个月所有的搜索词,正好验证了我们开头说的shuffle所作的工作。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)