hadoop join之map side join

2023-11-12

在本例中,我们仍然采用上一例中的数据文件。之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。

本实例中的运行参数需要三个,加入在hdfs中有两个目录input和input2,其中input2存放user.csv,input存放order.csv,则运行命令格式如下:hadoop jar xxx.jar JoinWithDistribute input2/user.csv input output。

具体实例如下,此实例我们采用旧的API来写:

public class JoinWithDistribute extends Configured implements Tool
{

    public static class MapClass extends MapReduceBase 
        implements Mapper<LongWritable, Text, Text, Text>
    {

        //用于缓存小表的数据,在这里我们缓存user.csv文件中的数据
        private Map<String, String> users = new HashMap<String, String>();

        private Text outKey = new Text();

        private Text outValue = new Text();

        //此方法会在map方法执行之前执行
        @Override
        public void configure(JobConf job)
        {
            BufferedReader in = null;

            try
            {
                //从当前作业中获取要缓存的文件
                Path[] paths = DistributedCache.getLocalCacheFiles(job);
                String user = null;
                String[] userInfo = null;

                for (Path path : paths)
                {
                    if (path.toString().contains("user.csv"))
                    {
                        in = new BufferedReader(new FileReader(path.toString()));
                        while (null != (user = in.readLine()))
                        {
                            userInfo = user.split(",", 2);
                            //缓存文件中的数据
                            users.put(userInfo[0], userInfo[1]);
                        }
                    }
                }
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
            finally
            {
                try
                {
                    in.close();
                }
                catch (IOException e)
                {
                    e.printStackTrace();
                }
            }
        }

        public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, 
                Reporter reporter) throws IOException
        {
            //首先获取order文件中每条记录的userId,
            //再去缓存中取得相同userId的user记录,合并两记录并输出之。
            String[] order = value.toString().split(",");
            String user = users.get(order[0]);
            
            if(user != null)
            {
                outKey.set(user);
                outValue.set(order[1]);
                output.collect(outKey, outValue);
            }
        }

    }

    public int run(String[] args) throws Exception
    {
        JobConf job = new JobConf(getConf(), JoinWithDistribute.class);

        job.setJobName("JoinWithDistribute");
        job.setMapperClass(MapClass.class);
        job.setNumReduceTasks(0);

        job.setInputFormat(TextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        
        //我们把第一个参数的地址作为要缓存的文件路径
        DistributedCache.addCacheFile(new Path(args[0]).toUri(), job);
        FileInputFormat.setInputPaths(job, new Path(args[1]));
        FileOutputFormat.setOutputPath(job, new Path(args[2]));

        JobClient.runJob(job);

        return 0;
    }

    public static void main(String[] args) throws Exception
    {
        int res = ToolRunner.run(new Configuration(), new JoinWithDistribute(), args);
        System.exit(res);
    }

}




 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

hadoop join之map side join 的相关文章

  • 在Python中从字符串中删除除字母数字字符之外的所有内容

    使用 Python 从字符串中去除所有非字母数字字符的最佳方法是什么 中提出的解决方案这个问题的 PHP 变体 https stackoverflow com questions 840948可能会进行一些小的调整 但对我来说似乎不太 Py
  • 计算字符串的所有子串中子序列的出现次数

    我想编写一个算法来计算字符串的所有子字符串中字符子序列 不相交 出现的总数 下面是一个例子 字符串 jabcohnnyjohnny 后续 约翰尼 包含子序列的子字符串 jabcohnny jabcohnnyj jabcohnnyjo jab
  • 如何在 Rust 中从文字创建格式化字符串?

    我将根据给定的参数返回一个字符串 fn hello world name Option
  • PHP中用逗号分解复杂字符串

    我需要分割一个包含逗号的字符串 我已经找到了类似字符串的东西 str getcsv A B with a comma eh C 但我的字符串是这样的 例如值没有包含字符 A B one two C 我需要分解它并获得 array 3 0 g
  • 如何使用 Swift 4 将字符串拆分为英语和非英语?

    我有一个包含英语和阿拉伯语的字符串 我正在使用 API 这就是为什么我无法在其中设置指标的原因 我想要得到的是 阿拉伯语和英语分成两部分 这是一个示例字符串 Bismika rabbee wadaAAtu janbee wabika arf
  • 字符串插值搜索

    对于那些不熟悉插值搜索的人来说 这是一种在排序数组中搜索值的方法 可能比二分搜索更快 您查看第一个和最后一个元素 并 假设数组的内容均匀分布 线性插值以预测位置 例如 我们有一个长度为 100 的数组 其中 array 0 0 和 arra
  • 如何将整数日期转换为格式化日期字符串(即 2012009 到 2/01/2009)

    有任何想法吗 我想不出任何办法 我有一个从 csv 文件加载的日期列表 它们被保存为所有整数 或者更确切地说是一串整数 即 2009 年 1 月 1 日 1012009 关于如何将 1012009 变成 1 01 2009 有什么想法吗 T
  • 具有多行值的 PEP8 多行字典

    我使用 Black for Python 它符合 PEP8 https github com psf black the black code style 它删除两行长值字符串的第二行的缩进 mydict key0 value0 key1
  • 没有函数映射到名称“coord:formatTime”

    我正在尝试使用 oozie 中的以下内容获取当前时间戳
  • C++:初始化静态字符串成员

    我在 C 中初始化静态字符串成员时遇到一些问题 我有几个类 每个类都包含几个表示 id 的静态字符串成员 当我通过调用静态函数初始化变量时 一切都很好 但是 当我想为一个变量分配另一个变量的值时 它仍然保留空字符串 这段代码有什么问题 st
  • 当字符串的长度大于n时,如何打印字符串的前n个字节?

    所以我有一个具有一定字节数 或长度 的字符串 我说字节是因为字符串末尾没有 NULL 终止符 不过 我知道绳子有多长 通常 众所周知 当您printf s str 它将继续打印每个字节 直到到达 NULL 字符 我知道没有 C 字符串不是
  • 当sql连接中存在两个同名列时,如何从一个表列中获取值

    当我连接两个具有相同名称列的表时 我目前面临着尝试获取值的问题 例如 table1 date和table2 date 每个表中的日期不同 我将如何获取 日期 本例中的表1 我目前正在跑步 while row mysqliquery gt f
  • 在 Objective-C 中比较 2 个字符串

    我写了以下代码 if depSelectedIndice gt 1 comSelectedIndice gt 1 NSLog depart elemet d depSelectedIndice NSLog depart elemet d c
  • 通过Oozie命令行指定多个过滤条件

    我正在尝试通过命令行搜索一些特定的 oozie 作业 我使用以下语法进行相同的操作 oozie jobs filter status RUNNING status KILLED 但是 该命令仅返回正在运行的作业 而不是已杀死的作业 需要帮助
  • java中如何连接字符串

    这是我的字符串连接代码 StringSecret java public class StringSecret public static void main String args String s new String abc s co
  • Python:删除字符串开头的数字

    我有一些这样的字符串 string1 123 123 This is a string some other numbers string2 1 This is a string some numbers string3 12 3 12 T
  • 如何在 python 3.x 中使用 string.replace()

    The string replace 在 python 3 x 上已弃用 这样做的新方法是什么 与 2 x 一样 使用str replace https docs python org library stdtypes html str r
  • 制作一个js数组,输入框的值用空格分隔

    我正在尝试使用文本框进行用户输入并将它们放入数组中 基本上 如果用户输入像这样的字符串 10 23 4566 234 10 我希望将数字放入 10 23 4566 234 10 等数组中 这可能吗 我只处理数字 字符串可能会很长 你会想要使
  • 如何使用InputConnectionWrapper?

    我有一个EditText 现在我想获取用户对此所做的所有更改EditText并在手动将它们插入之前使用它们EditText 我不希望用户直接更改中的文本EditText 这只能由我的代码完成 例如通过使用replace or setText
  • 如何在hadoop mapreduce/yarn中设置VCORES?

    以下是我的配置 mapred site xml map mb 4096 opts Xmx3072m reduce mb 8192 opts Xmx6144m yarn site xml resource memory mb 40GB min

随机推荐

  • 四层负载均衡的NAT模型与DR模型推导

    导读 本文首先讲述四层负载均衡技术的特点 然后通过提问的方式推导出四层负载均衡器的NAT模型和DR模型的工作原理 通过本文可以了解到四层负载均衡的技术特点 NAT模型和DR模型的工作原理 以及NAT模型和DR模型的优缺点 读者可以重点关注N
  • 【IDEA】IDEA 下 maven 一个诡异问题,一个正常项目 过了一夜 依赖很多 飘红

    文章目录 1 场景1 1 1 概述 2 场景再现2 1 场景1 1 1 概述 我有一个项目是flink 1 9 升级到 flink 1 10 升级完毕后 我都在服务器打包了 然后过了一夜后 第二天也能正常打包 然后下午的时候 去运行主类 本
  • 【Unity底层插件】Dll打包のBug

    1 修改官方demoRenderingPlugin cpp时 UnityPluginLoad不会被调用 解决方案 https forum unity3d com threads native plug in issues unityplug
  • STM32系统时钟频率更改

    注 此文仅作为个人学习记录 海创学习记录 图0 手册时钟图 stm32的系统时钟频率在驱动文件中一般情况下是被固定的 系统频率有几种 分别为24MHz 36MHz 48MHz 56MHz 72MHz 一般情况下 md s 默认设置为72MH
  • Android最佳实践——深入浅出WebSocket协议

    转自 https blog csdn net sbsujjbcy article details 52839540 首先明确一下概念 WebSocket协议是一种建立在TCP连接基础上的全双工通信的协议 概念强调了两点内容 TCP基础上 全
  • 树莓派+多个微雪电子Serial Expansion HAT扩展板叠加方法(扩展多个IO口、串口)

    微雪电子官方教程仅介绍了单层扩展板配置方法 因此本文参考官方教程的基础上 进行多个扩展板的叠加配置 文章目录 一 打开I2C接口 二 安装库 三 生成设备 四 堆叠教程 五 扩展IO口配置 总结 一 打开I2C接口 在终端执行 sudo r
  • Lyapunov稳定性分析1(正定函数、二次型正定判定)

    一 正定函数 1 1 定义 令V x 是向量x的标量函数 S是x空间包含原点的封闭有限区域 如果对于S中的所有x 都有 则V x 是正定的 半正定 正定函数更直观的描述如下图所示 如果条件 3 中不等式的符号反向 则称V x 是负定的 负半
  • 拼多多产品怎么引流?拼多多商品怎么引更多的流量?

    说到拼多多引流技巧 对于有资源的商家来说可能没有什么难事 而对于一些刚刚入手这个行业的商家来说未尝不是一种借鉴 博傲电商今天分享几点 首先是说下直通车引流 这个方法简单粗暴 看上去只要烧钱投放广告 流量都可以进来 是一个效果比较好的方法 但
  • android内存管理 面试题,Android面试题内存&性能篇

    Android面试题内存 性能篇 由本人整理汇总 后续将继续推出系列篇 如果喜欢请持续关注和推荐 内存分配 RAM random access memory 随机存取存储器 说白了就是内存 一般Java在内存分配时会涉及到以下区域 寄存器
  • DNS 协议是什么?完整查询过程?为什么选择使用 UDP 协议发起 DNS 查询?

    你可能了解 DNS 协议是什么 那你了解 DNS 完整查询过程是什么吗 它底层是基于 TCP 还是 UDP 喃 TCP 与 UDP 又各自负责 DNS 的哪些部分喃 引言 本文从以下几个方面循序渐进走进 DNS 协议 它的完整查询过程以及底
  • 数据结构---堆----C语言实现

    目录 堆排序介绍 功能介绍 功能实现 公式 功能合并 这里添加另一种方法 只用到了向下调整法就可排序 源码1 源码2 堆排序介绍 堆排序 Heapsort 是指利用堆这种数据结构所设计的一种排序算法 是一个近似完全二叉树的结构 并同时满足堆
  • Android中协调布局CoordinatorLayout的使用

    Android自5 0之后对UI做了较大的提升 一个重大的改进是推出了MaterialDesign库 而该库的基础即为协调布局CoordinatorLayout 几乎所有的design控件都依赖于该布局 今天我们就学习一下Coordinat
  • 程序分析 clang系列学习 (二)

    clang静态检测 clang API AST匹配部分 UseAfterMoveCheck 问题概述 示例 代码 AST CFG 检测步骤 算法大致流程 代码 这里 我主要通过clang API实现自定义的代码检测工具 采用的方式类似于cl
  • 非线性弹簧摆的仿真(Matlab代码实现)

    欢迎来到本博客 博主优势 博客内容尽量做到思维缜密 逻辑清晰 为了方便读者 座右铭 行百里者 半于九十 本文目录如下 目录 1 概述 2 运行结果 3 参考文献 4 Matlab代码实现 1 概述 本文模拟非线性弹簧摆 弹簧运动和摆锤运动的
  • html dom动态添加id,JavaScript/DOM - 给新创建的元素添加ID

    如何通过JavaScript DOM将元素ID应用于新创建的元素 JavaScript DOM 给新创建的元素添加ID 我写的代码创建了一个由按钮触发的表 我需要为此表应用一个唯一的ID 因此它可以与我的网站上出现的其他人的样式不同 这里是
  • 计算机操作系统之期末考试复习——进程的基本状态及转换

    进程的基本状态 就绪状态 Ready 进程已处于准备好运行的状态 即进程已分配到除CPU以外的所有必要资源后 只要获得CPU 便可立即执行 执行状态 Running 进程以获得CPU 其程序正在执行的状态 阻塞状态 Block 正在执行的进
  • Python调用海康威视网络相机之——python调用海康威视C++的SDK

    运行环境 Win10系统 64位 Anaconda3 python 3 5 5 基于anaconda环境 opencv 3 4 2 早前就已经安装了 本文不会讲如何安装 Visual Studio 2015 硬件 DS 2CD3T56DWD
  • 记录qt窗口在拖动过程中出现的问题

    问题描述 在窗口拖动的过程中刚开始可以流畅的拖动窗口 但是一小会儿之后出现窗口拖不动的现象 或者按下鼠标在拖动区域内可以流畅拖动 但是按下鼠标朝一个方向拖动后 释放鼠标 按照此操作操作几次后 出现窗口拖不动的情况 但是过一会儿后窗口又可以被
  • 使用Python批量将EXCEL转为CSV文件

    今天遇到一个批量将100多个EXCEL 且每个excel中有多个sheet页 转为CSV文件的需求 同事使用wps右键逐个拆分 效率实在太慢 网上查了一下 使用pandas库中的to csv 方法可以解决这个问题 下面这段代码可以批量处理e
  • hadoop join之map side join

    在本例中 我们仍然采用上一例中的数据文件 之所以存在reduce side join 是因为在map阶段不能获取所有需要的join字段 即 同一个key对应的字段可能位于不同map中 Reduce side join是非常低效的 因为shu