如何使用Python为Hadoop编写一个简单的MapReduce程序

2023-05-16

转载自:http://asfr.blogbus.com/logs/44208067.html

          在这个实例中,我将会向大家介绍如何使用Python 为  Hadoop编写一个简单的 MapReduce
程序。
尽管 Hadoop 框架是使用Java编写的但是我们仍然需要使用像C++、Python等语言来实现 Hadoop程序。尽管 Hadoop官方网站给的示例程序是使用Jython编写并打包成Jar文件,这样显然造成了不便,其实,不一定非要这样来实现,我们可以使用Python与 Hadoop 关联进行编程 ,看看位于/src/examples/python/WordCount.py  的例子,你将了解到我在说什么。

我们想要做什么?

我们将编写一个简单的  MapReduce 程序,使用的是C-Python,而不是Jython编写后打包成jar包的程序。
我们的这个例子将模仿  WordCount 并使用Python来实现,例子通过读取文本文件来统计出单词的出现次数。结果也以文本形式输出,每一行包含一个单词和单词出现的次数,两者中间使用制表符来想间隔。

先决条件

编写这个程序之前,你学要架设好 Hadoop 集群,这样才能不会在后期工作抓瞎。如果你没有架设好,那么在后面有个简明教程来教你在Ubuntu Linux 上搭建(同样适用于其他发行版linux、unix)

如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立单节点的 Hadoop 集群

如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立多节点的 Hadoop 集群


Python的MapReduce代码


使用Python编写MapReduce代码的技巧就在于我们使用了  HadoopStreaming 来帮助我们在Map 和 Reduce间传递数据通过STDIN (标准输入)和STDOUT (标准输出).我们仅仅使用Python的 sys.stdin来输入数据,使用 sys.stdout输出数据,这样做是因为HadoopStreaming会帮我们办好 其他事。这是真的,别不相信!

Map: mapper.py


将下列的代码保存在 /home/hadoop/mapper.py中,他将从STDIN读取数据并将单词成行分隔开,生成一个列表映射单词与发生次数的关系:
注意:要确保这个脚本有足够权限 (chmod +x /home/hadoop/mapper.py)。


#!/usr/bin/env python
 
import sys
 
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print '%s\\t%s' % (word, 1)
在这个脚本中,并不计算出单词出现的总数,它将输出 "<word> 1" 迅速地,尽管<word>可能会在输入中出现多次,计算是留给后来的Reduce步骤(或叫做程序)来实现。当然你可以改变下编码风格,完全尊重你的习惯。

Reduce: reducer.py


将代码存储在 /home/hadoop/reducer.py 中,这个脚本的作用是从 mapper.py 的STDIN中读取结果,然后计算每个单词出现次数的总和,并输出结果到STDOUT。
同样,要注意脚本权限: chmod +x /home/hadoop/reducer.py


#!/usr/bin/env python
 
from operator import itemgetter
import sys
 
# maps words to their counts
word2count = {}
 
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
 
# parse the input we got from mapper.py
word, count = line.split('\\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
word2count[word] = word2count.get(word, 0) + count
except ValueError:
# count was not a number, so silently
# ignore/discard this line
pass
 
# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
 
# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
print '%s\\t%s'% (word, count)

测试你的代码( cat data | map | sort | reduce)


我建议你在运行MapReduce job测试前尝试手工测试你的 mapper.py 和  reducer.py脚本,以免得不到任何返回结果
这里有一些建议,关于如何测试你的Map和Reduce的功能:

——————————————————————————————————————————————  
\r\n

 # very basic test
hadoop@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hadoop/mapper.py
foo 1
foo 1
quux 1
labs 1
foo 1
bar 1
——————————————————————————————————————————————
hadoop@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hadoop/mapper.py | sort | /home/hadoop/reducer.py
bar 1
foo 3
labs 1
——————————————————————————————————————————————

# using one of the ebooks as example input
# (see below on where to get the ebooks)
hadoop@ubuntu:~$ cat /tmp/gutenberg/20417-8.txt | /home/hadoop/mapper.py
The 1
Project 1
Gutenberg 1
EBook 1
of 1
[...]
(you get the idea)

quux 2

quux 1


——————————————————————————————————————————————

在Hadoop平台上运行Python脚本

为了这个例子,我们将需要三种电子书:

  • The Outline of Science, Vol. 1 (of 4) by J. Arthur Thomson\r\n
  • The Notebooks of Leonardo Da Vinci\r\n
  • Ulysses by James Joyce
下载他们,并使用 us-ascii编码存储 解压后的文件,保存在临时目录,比如 /tmp/gutenberg.


 hadoop@ubuntu:~$ ls -l /tmp/gutenberg/
total 3592
-rw-r--r-- 1 hadoop hadoop 674425 2007-01-22 12:56 20417-8.txt
-rw-r--r-- 1 hadoop hadoop 1423808 2006-08-03 16:36 7ldvc10.txt
-rw-r--r-- 1 hadoop hadoop 1561677 2004-11-26 09:48 ulyss12.txt
hadoop@ubuntu:~$


复制本地数据到HDFS

在我们运行MapReduce job 前,我们需要将本地的文件复制到HDFS中:

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -copyFromLocal /tmp/gutenberg gutenberg
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls
Found 1 items
/user/hadoop/gutenberg <dir>
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls gutenberg
Found 3 items
/user/hadoop/gutenberg/20417-8.txt <r 1> 674425
/user/hadoop/gutenberg/7ldvc10.txt <r 1> 1423808
/user/hadoop/gutenberg/ulyss12.txt <r 1> 1561677

执行 MapReduce job

现在,一切准备就绪,我们将在运行Python MapReduce job 在Hadoop集群上。像我上面所说的,我们使用的是
HadoopStreaming 帮助我们传递数据在Map和Reduce间并通过STDIN和STDOUT,进行标准化输入输出。


hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar
 -mapper /home/hadoop/mapper.py -reducer /home/hadoop/reducer.py -input gutenberg/*
-output gutenberg-output
在运行中,如果你想更改Hadoop的一些设置,如增加Reduce任务的数量,你可以使用“-jobconf”选项:

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar
-jobconf mapred.reduce.tasks=16 -mapper ...

一个重要的备忘是关于Hadoop does not honor mapred.map.tasks
这个任务将会读取HDFS目录下的gutenberg并处理他们,将结果存储在独立的结果文件中,并存储在HDFS目录下的
gutenberg-output目录。
之前执行的结果如下:

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar
-mapper /home/hadoop/mapper.py -reducer /home/hadoop/reducer.py -input gutenberg/*
-output gutenberg-output

additionalConfSpec_:null
null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
packageJobJar: [/usr/local/hadoop-datastore/hadoop-hadoop/hadoop-unjar54543/]
[] /tmp/streamjob54544.jar tmpDir=null
[...] INFO mapred.FileInputFormat: Total input paths to process : 7
[...] INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop-datastore/hadoop-hadoop/mapred/local]
[...] INFO streaming.StreamJob: Running job: job_200803031615_0021
[...]
[...] INFO streaming.StreamJob: map 0% reduce 0%
[...] INFO streaming.StreamJob: map 43% reduce 0%
[...] INFO streaming.StreamJob: map 86% reduce 0%
[...] INFO streaming.StreamJob: map 100% reduce 0%
[...] INFO streaming.StreamJob: map 100% reduce 33%
[...] INFO streaming.StreamJob: map 100% reduce 70%
[...] INFO streaming.StreamJob: map 100% reduce 77%
[...] INFO streaming.StreamJob: map 100% reduce 100%
[...] INFO streaming.StreamJob: Job complete: job_200803031615_0021


[...] INFO streaming.StreamJob: Output: gutenberg-output hadoop@ubuntu:/usr/local/hadoop$


正如你所见到的上面的输出结果,Hadoop 同时还提供了一个基本的WEB接口显示统计结果和信息。
当Hadoop集群在执行时,你可以使用浏览器访问 http://localhost:50030/  ,如图:


使用Python实现Hadoop MapReduce程序 - ASFR! - ASFR s Blog-on mywy

检查结果是否输出并存储在HDFS目录下的gutenberg-output中:

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls gutenberg-output
Found 1 items
/user/hadoop/gutenberg-output/part-00000 <r 1> 903193 2007-09-21 13:00
hadoop@ubuntu:/usr/local/hadoop$

可以使用dfs -cat 命令检查文件目录

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -cat gutenberg-output/part-00000
"(Lo)cra" 1
"1490 1
"1498," 1
"35" 1
"40," 1
"A 2
"AS-IS". 2
"A_ 1
"Absoluti 1
[...]
hadoop@ubuntu:/usr/local/hadoop$

注意比输出,上面结果的(")符号不是Hadoop插入的。


改善Mapper 和 Reducer 使用Python的iterators 和 generators

请参考:Python iterators and generators


http://www.michael-noll.com/wiki/Writing_An_Hadoop_MapReduce_Program_In_Python#What_we_want_to_do
\r\n
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用Python为Hadoop编写一个简单的MapReduce程序 的相关文章

  • mysql修改root密码

    打开mysql命令终端 MySQL 8 0 Command Line Client xff0c 然后输入密码进入 紧接着输入如下命令 xff0c 可将密码更改为 rootcgcl alter user root 64 localhost i
  • Linux下vncviewer和vncserver的安装

    1 安装vncserver xff08 1 xff09 需要以root用户进行vncserver的安装 xff0c 命令行为 xff1a yum install tigervnc server xff08 2 xff09 安装vncview
  • 怎样把shell结果赋值给变量 | shell 中获取命令语句结果的方式

    怎样把shell结果赋值给变量 xff0c shell 中获取命令语句结果的方式 xff0c 通常采用以下两种方式 xff1a 1 执行符号方式 96 96 如 xff1a a 61 96 echo abc 96 echo a abc 2
  • CSS的替换元素

    CSS的 替换元素 xff1a 通过修改某个元素的属性值呈现的内容就可以被替换的元素 替换元素的特性 xff1a 内容不受页面上的CSS的影响 也就是样式表现在CSS作用域之外大部分有自己默认的尺寸 xff0c lt img gt 标签没有
  • Jack 服务编译问题 Android 7.0

    jack 服务常见错误解决方法 当你编译Android时 xff0c 你不需要修改任何内容 Jack是Andriod M的默认编译工具 只需使用标准的makefile命令执行即可 当第一次执行jack时 xff0c 它会在你的机器上启动一个
  • Tomcat+Nginx+HTTPS

    root 64 lb01 conf d cat etc nginx conf d proxy zrlog oldboy com conf upstream zrlog server 172 16 1 7 8080 server 172 16
  • PHP多进程异步处理复杂接口类似微服务(企业真实案例)

    需求 用户下单 推荐合师傅给用户 类似滴滴派单 场景 在线服务平台有各类技术师傅入驻 顾客在下单后需要根据在线师傅及顾客位置计算推荐总分排行后返回推荐的师傅给用户 问题 1 php fpm 框架 无法多线程工作 2 平台师傅有多个评分属性
  • Swoole数据库连接池分析及实现

    使用PHP swoole 由于其内存常驻及协程特性 一般是需要使用数据库链接池来减少链接创建的开支的 一个连接池的实现难点在哪 下面分析 1 如何判断是否该获取新的链接 A 默认规则一个协程对应一个数据库连接 同一个协程里应该返回同一个链接
  • Api接口数据安全及数据加密方式主要流程实现

    简述接口数据安全的主要实现方式 一 数据校验 常用算法 MD5 SHA1 流程 1 前端生成数据后按照约定方式生成一个sign 校验字段 一般通过MD5或者SHA1 方式 一并提交给后端 2 后端获得参数后通过同样的方式生成sign 然后跟
  • 简述PHP执行流程

    目的 xff1a 本文主要介绍PHP执行流程 目的是梳理php代码是如何最终转换成为机器二进制指令而被执行的 参考文章 xff1a https blog csdn net diavid article details 81035188 PH
  • Java为啥比PHP快?

    一直都说php比java要慢 今天从理论跟实际测试看看php是否真的慢 慢在哪里 一 运行模式对比 java 一般用java 语言开发的网站项目都是以命令行模式运行 部分可能以可执行文件 xff08 exe xff09 的形式运行 php
  • PHP微服务 hyperf+nacos使用

    PHP微服务 hyperf 43 nacos使用 这里简单说下微服务 及架构方面东西 1 微服务对php 43 fpm 模式意义不是很大 原因就是php 43 fpm 天生支持模块拆分 热更新 如果只是性能上的考虑 那php 43 fpm
  • PHP项目临时拓容Nginx负载均衡实操记录

    项目域名 test baidu com 服务器A 127 0 0 1 内网ip 原有服务器 服务器B 172 30 228 254 内网ip 需求 项目本在服务器A中正常运行 现在临时搞活动 需要拓容一台 多台服务器 在最小成本跟改动下完成
  • layui templet中html标签获取js全局变量方法

    开发中涉及layui中 xff0c 在使用到table的模板方法时templet xff0c 会遇到其内部除了使用table field xff08 此处通过d 来获取 xff0c 就不啰嗦了 xff09 然后如果想获取某个外部js中的全局
  • PHP分布式部署代码同步Git实现

    PHP 分布式部署后 代码自动同步实现 项目架构如下 需要更新代码时我们只需要把代码传到主服务器后通过定时任务主服务器自动push 代码到Git服务端 之后其他从服务器则自动从Git云端拉取最新的代码即可 需要用到 expect 软件 安装
  • nginx 负载均衡502问题

    项目架构 nginx 43 php fpm 负载均衡 负载均衡关键配置如下 引入负载均衡配置 include proxy conf 负载均衡 upstream test balance server 172 28 196 xxx 80 we
  • 用Android 动画 演示冒泡排序

    之前面试遇到的一道机试题 当时时间不够没有调出来 有时间把它整了一下 代码 public class MainActivity extends ActionBarActivity implements OnClickListener pri
  • 教你怎么阅读外文文献

    转载自 http www douban com group topic 14551517 NO 1 中科院大博士是如何进行文献检索和阅读的 xff08 好习惯受益终生 xff09 一 如何进行文献检索 我是学自然科学的 xff0c 平时确实
  • webpack打包时提示Invalid configuration object错误

    初学者如果是通过网上教程来学习webpack xff0c 第一次用webpack打包时通常会遇到下面这样的问题 xff1a 实际上出错信息已经说明了问题原因 xff1a Invalid configuration object Webpac
  • Maven核心概念(1)--坐标

    注 xff1a 转载时请注明原作者 lreis2010 及出处 http blog csdn net lreis2010 xff01 作者初次接触Maven是希望有一种方式能够自动化地管理项目中使用的Jar包 随着对于Maven的学习 xf

随机推荐

  • 【UML】四种关系

    一 在学习UML中的时候含有的四种关系是 xff1a 关联Association xff1a 是一种结构化的关系 xff0c 指一种对象和另一种对象有联系 xff0c 给定关联的两个类 xff0c 可以从其中的一个类的对象访问到另一个类的相
  • vnc,在windows系统上安装vnc,操作教程

    VNC是一款可以实现远程桌面控制 方面很实用的小工具 xff0c 今天给大家分享如何在在windows系统上安装vnc的操作方法 xff1a 小编在这里用到了 xff1a IIS7服务器管理工具来操作的 具体操作的如下 xff1a 一 首先
  • 51单片机手动自动智能窗户窗帘控制系统手动自动定时

    实践制作DIY GC 00 45 智能窗户窗帘控制系统 一 功能说明 xff1a 基于 51 单片机设计 智能窗户窗帘控制系统 二 功能介绍 xff1a STC89C52 AT89C52 系列最小系统板 43 5VUSB电源 43 ULN2
  • linux下 bash-completion 离线安装(Ubuntu或centos )

    bash completion 安装 实现k8s命令自动补全 xff0c 我们需要安装bash completion 在github下载离线包 下载地址解压 tar xvJf bash completion 2 11 tar xz 命令补全
  • ROS自定义地图(CAD、手绘等)

    0x00 概述 在前面的文章中 xff0c 我们介绍如何自动导航时 xff0c 都是基于使用gmapping或者hector mapping创建的地图 当然使用其他的建图方法创建的地图也可以 xff0c 但是目前为止 xff0c 无论使用哪
  • STM32 控制蜂鸣器播放音乐的原理和实例

    STM32 控制蜂鸣器播放音乐的原理和实例 本文通过将乐谱里的每个音符的声音频率和声音时长保存在两个数组里面 1 使用通用定时器TIM4实现无中断的微秒级延时函数 xff0c 控制每个音符的发声时长 2 使用系统滴答时钟Systick实现带
  • 影响力最大化——CELF算法的简介与python实现

    CELF算法是Leskovecl等人利用IC模型的子模特性对爬山贪心算法进一步改进得到的优化算法 子模函数的定义为 任意函数f 将有限集合映射为非负实数集并且满足收益递减特性即为子模函数 设集合s T xff0c 任意元素v添加到集合S中获
  • Qos队列调度算法(SP/WRR/DWRR)

    本文重点分析sonic中支持的三种Qos队列调度算法 xff1a 1 SP xff08 Strict Priority xff0c 严格优先级 xff09 也称为PQ xff08 Priority Queuing xff09 调度 xff0
  • python的MapReduce的应用案例

    在学习这个项目中用到许多数学公式 xff0c 有的自己不太懂 xff0c 所以上传上来进行实地应用 参考资料 generate train feature map py usr bin env python encoding 61 UTF
  • 索赔649亿!GitHub Copilot惹上官司,被指控侵犯代码版权,是开源社区“寄生虫”...

    大数据文摘授权转载自AI前线 整理 xff1a 刘燕 xff0c 核子可乐 一位 20 年老开源程序员 xff1a GitHub Copilot 就是开源社区的 寄生虫 GitHub 面临集体起诉 xff0c 索赔 647 亿 GitHub
  • SDN网络技术:OpenFlow协议(1)

    本文首发于我的公众号码农之屋 xff08 id Spider1818 xff09 xff0c 专注于干货分享 xff0c 包含但不限于Java编程 网络技术 Linux内核及实操 容器技术等 欢迎大家关注 xff0c 二维码文末可以扫 导读
  • Ubuntu、debian安装图形界面,输入法,解决远程桌面卡顿问题

    安装图形界面 tasksel选择安装Ubuntu Desktopapt get install xrdp tigervnc standalone server安装远程接入systemctl start xrdpsystemctl enabl
  • JS 异步 ( 一、异步概念、Web worker 基本使用 )

    相关阅读 xff1a JS 异步 一 异步概念 Web worker 基本使用 JS 异步 二 Promise 的用法 手写模拟 Promise JS 异步 三 generator 的用法 async await 的用法 文章目录 异步异步
  • eve-ng 自定义linux镜像

    文章目录 1 创建目录2 上传镜像并改名3 创建虚拟磁盘qcow24 登录eve网页5 查找lab UUID和虚拟机编号6 将系统提交成模板7 压缩镜像 xff08 可选 xff09 1 创建目录 root 64 eve ng opt un
  • 百度地图Marker的定位和方向

    原文 xff1a http bbs lbsyun baidu com forum php mod 61 viewthread amp tid 61 83704 今天做百度地图需要在显示很多车辆的位置信息 并显示车辆的角度和行驶方向 需要用到
  • ELFhash - 优秀的字符串哈希算法

    1 字符串哈希 xff1a 我们先从字符串哈希说起 在很多的情况下 xff0c 我们有可能会获得大量的字符串 xff0c 每个字符串有可能重复也有可能不重复 C不像Python有字典类型的数据结构 xff0c 我们没有办法吧字符串当做是键值
  • 详解TensorFlow数据读取机制(附代码)

    在学习TensorFlow的过程中 xff0c 有很多小伙伴反映读取数据这一块很难理解 确实这一块官方的教程比较简略 xff0c 网上也找不到什么合适的学习材料 今天这篇文章就以图片的形式 xff0c 用最简单的语言 xff0c 为大家详细
  • Linux下安装boa服务器遇到的问题

    最近在CentOS7机器上安装boa服务器的时候 xff0c 遇到了不少问题 xff0c 在这里记录一下 1 从官网下载最新 boa源码包 xff0c 网址 xff1a http www boa org xff1b 2 解压 xff0c 进
  • 【linux】查看Linux系统版本信息的几种方法

    一 查看Linux内核版本命令 xff08 两种方法 xff09 xff1a 1 cat proc version 2 uname a 二 查看Linux系统版本的命令 xff08 3种方法 xff09 xff1a 1 lsb releas
  • 如何使用Python为Hadoop编写一个简单的MapReduce程序

    转载自 xff1a http asfr blogbus com logs 44208067 html 在这个实例中 xff0c 我将会向大家介绍如何使用Python 为 Hadoop编写一个简单的 MapReduce 程序 尽管 Hadoo