Hadoop Mapreduce编程之Reduce端join实现

2023-11-04

1.数据准备

movies.dat 数据格式: // movieid::moviename::movietype

ratings.dat 数据格式: // userid::movieid::rating::timestamp

2.Mapper端开发
1)定义必要的变量
private String filename="";
IntWritable mk=new IntWritable();
Text mv=new Text();
2)通过重写setup方法获取切片的文件名
@Override
protected void setup(Context context) throws IOException, InterruptedException {
    FileSplit inputSplit = (FileSplit) context.getInputSplit(); // 获取文件切片
    filename = inputSplit.getPath().getName();   //获取文件名
}
3) map方法—根据每个文件的文件名不同来标识不同表
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String[] lines = value.toString().split("::");
    if(filename.equals("movies.dat")){ // movieid::moviename::movietype
        mk.set(Integer.parseInt(lines[0].trim()));
        mv.set("M"+lines[1]+"\t"+lines[2]);
    }else{// 文件名为ratings.dat
        mk.set(Integer.parseInt(lines[1].trim()));// userid::movieid::rating::timestamp
        mv.set("R"+lines[0]+"\t"+lines[2]+"\t"+lines[3]);
    }
    context.write(mk,mv);
}
3.Reducer端开发
public class RatingReduce extends Reducer<IntWritable, Text,IntWritable,Text> {
    Text mv=new Text();
    @Override
    protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        List<String> mlist=new ArrayList<>();
        List<String> rlist=new ArrayList<>();
        for (Text value : values) {
            String info =value.toString();
            if(info.startsWith("M")){
                mlist.add(info.substring(1));
            }else {
                rlist.add(info.substring(1));
            }
        }
        // 开始拼接
        for (String movie : mlist) {
            for (String rating : rlist) {
                String res=movie+"\t"+rating;
                mv.set(res);
                context.write(key,mv);
            }
        }
    }
}
4.Driver端开发
public class RatingDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        System.setProperty("HADOOP_USER_NAME","hadoop");
        Configuration conf=new Configuration();
        conf.set("fs.defaultFS","hdfs:/mkmg/");
        Job job = Job.getInstance(conf);

        job.setJarByClass(RatingDriver.class);

        job.setMapperClass(RatingMapper.class);
        job.setReducerClass(RatingReduce.class);

        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job,new Path("D://movie/ratings.dat"),new Path("D://movie/movies.dat"));
        FileSystem fs=FileSystem.get(conf);
        Path out=new Path("D://movie_out");
        if(fs.exists(out)){
            fs.delete(out,true);
        }
        FileOutputFormat.setOutputPath(job,out);

        job.waitForCompletion(true);
    }
}
5.结论总结
/**
 * reduce端join的缺陷:-----适合大表和大表关联
 *      1)数据倾斜---分区分布不均匀
 *      2)因为reduce端采用的集合,数据量大的时候,可能会产生OOM
 *      3)reducetask本身并行度不高,导致性能比较低----经验值是:DataNode数量*0.95
 */
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Hadoop Mapreduce编程之Reduce端join实现 的相关文章

  • couchdb 视图使用另一个视图?

    我对 couchdb 中的视图有疑问 目前 我有许多视图 例如 view A view B view Z 对于每个视图 它们包含相同范围的键但具有不同的值 IE view A key key 1 value 10 key key 2 val
  • MongoDB 从两个数组计算值、排序和限制

    我有一个存储浮点数组的 MongoDB 数据库 假设以下格式的文档集合 id 0 vals 0 8 0 2 0 5 有一个查询数组 例如 带有值 0 1 0 3 0 4 我想计算集合中所有元素的距离 例如 差异之和 对于给定的文档和查询 它
  • PHP MongoDB映射减少数据库断言失败

    我第一次使用 PHP MongoDB 进行 Map Reduce 运行 MapReduce 命令时遇到错误 My code map function emit this topic id re date this date posted r
  • 如何具体确定MRJob中每个map步骤的输入?

    我正在从事一项地图缩减工作 包含多个步骤 使用 mrjob 每个步骤都会接收上一步的输出 问题是我不想这样 我想要的是提取一些信息并在第二步中针对所有输入等使用它 可以使用 mrjob 来做到这一点吗 Note 因为我不想使用emr 这个问
  • Log4j RollingFileAppender 未将映射器和减速器日志添加到文件中

    我们希望将应用程序日志打印到本地节点上的文件中 我们使用 Log4j 的 RollingFileAppender Our log4j properties文件如下 ODS LOG DIR var log appLogs ODS LOG IN
  • 在 Hadoop MapReduce 中解析 PDF 文件

    我必须在 Hadoop 的 MapReduce 程序中解析 HDFS 中的 PDF 文件 所以我从 HDFS 获取 PDF 文件为输入分割它必须被解析并发送到 Mapper 类 为了实现这个输入格式我已经经历过这个link http cod
  • FAILED 错误:java.io.IOException:所有收集器的初始化失败

    我在运行 MapReduce WordCount 作业时遇到一些错误 错误 java io IOException 所有收集器的初始化 失败的 最后一个收集器中的错误是 class wordcount wordmapper at org a
  • CouchDB“加入”两个文档

    我有两个看起来有点像这样的文档 Doc id AAA creator id data DataKey id credits left 500 times used 0 data id AAA 我想要做的是创建一个视图 它允许我传递 Data
  • hadoop map reduce 中的错误处理

    根据文档 有几种方法可以在 MapReduce 中执行错误处理 以下是一些 A 使用枚举的自定义计数器 每个失败记录的增量 b 记录错误并稍后分析 计数器给出失败记录的数量 然而 为了获取失败记录的标识符 可能是其唯一键 以及发生异常的详细
  • 映射减少计数示例

    我的问题是关于mapreduce programming in java 假设我有 WordCount java 示例 一个标准mapreduce program 我希望map函数收集一些信息 并返回形成如下的reduce函数map
  • 远程执行hadoop作业时出现异常

    我正在尝试在远程 hadoop 集群上执行 Hadoop 作业 下面是我的代码 Configuration conf new Configuration conf set fs default name hdfs server 9000 c
  • java.lang.IllegalArgumentException:错误的 FS:,预期:hdfs://localhost:9000

    我正在尝试实现reduce side join 并使用mapfile reader来查找分布式缓存 但在stderr中检查时它没有查找值 它显示以下错误 lookupfile文件已经存在于hdfs中 并且似乎已正确加载进入缓存 如标准输出中
  • 遍历 ArrayWritable - NoSuchMethodException

    我刚刚开始使用 MapReduce 并且遇到了一个奇怪的错误 我无法通过 Google 回答该错误 我正在使用 ArrayWritable 制作一个基本程序 但是当我运行它时 在Reduce过程中出现以下错误 java lang Runti
  • RavenDB:为什么我会在此多重映射/归约索引中获得字段空值?

    受到 Ayende 文章的启发https ayende com blog 89089 ravendb multi maps reduce indexes https ayende com blog 89089 ravendb multi m
  • 在映射器的单个输出上运行多个减速器

    我正在使用地图缩减实现左连接功能 左侧有大约 6 亿条记录 右侧有大约 2300 万条记录 在映射器中 我使用左连接条件中使用的列来创建键 并将键值输出从映射器传递到减速器 我遇到性能问题 因为两个表中的值数量都很高的映射器键很少 例如分别
  • 运行 Sqoop 导入和导出时如何找到最佳映射器数量?

    我正在使用 Sqoop 版本 1 4 2 和 Oracle 数据库 运行 Sqoop 命令时 例如这样 sqoop import fs
  • 更改 Hadoop 中的数据节点数量

    如何改变数据节点的数量 即禁用和启用某些数据节点来测试可扩展性 说得更清楚一点 我有4个数据节点 我想一一实验1 2 3 4个数据节点的性能 是否可以只更新名称节点中的从属文件 临时停用节点的正确方法 创建一个 排除文件 这列出了您想要删除
  • MongoDB/PyMongo:如何在 Map 函数中使用点表示法?

    我正在尝试计算每个邮政编码中找到的记录数 在我的 MongoDB 中 嵌入了邮政编码 使用点表示法 它位于 a res z a 代表地址 res 代表住宅 z 代表邮政编码 例如 这工作得很好 db NY count a res z 141
  • 是否可以通过编写单独的mapreduce程序并行执行Hive查询?

    我问了一些关于提高 Hive 查询性能的问题 一些答案与映射器和减速器的数量有关 我尝试使用多个映射器和减速器 但在执行中没有看到任何差异 不知道为什么 可能是我没有以正确的方式做 或者我错过了其他东西 我想知道是否可以并行执行 Hive
  • 为什么在我的例子中 For 循环比 Map、Reduce 和 List 理解更快

    我编写了一个简单的脚本来测试速度 这就是我发现的结果 实际上 for 循环在我的例子中是最快的 这真的让我感到惊讶 请查看下面 正在计算平方和 这是因为它在内存中保存列表还是有意为之 谁能解释一下这一点 from functools imp

随机推荐