Hadoop 流 - 从减速器输出中删除尾随选项卡

2023-11-22

我有一个 hadoop 流作业,其输出不包含键/值对。您可以将其视为仅值对或仅键对。

我的流式减速器(一个 php 脚本)正在输出由换行符分隔的记录。 Hadoop 流处理将此视为没有值的键,并在换行符之前插入一个制表符。这个额外的选项卡是不需要的。

我该如何删除它?

我正在将 hadoop 1.0.3 与 AWS EMR 结合使用。我下载了hadoop 1.0.3的源代码,并在hadoop-1.0.3/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java中找到了这段代码:

reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");

所以我尝试通过-D stream.reduce.output.field.separator=作为这项工作的论据,但运气不佳。我也尝试过-D mapred.textoutputformat.separator= and -D mapreduce.output.textoutputformat.separator=没有运气。

我当然搜索过谷歌,但没有找到任何有用的东西。一项搜索结果甚至指出没有可以传递的参数来实现所需的结果(尽管在这种情况下,hadoop 版本确实非常旧)。

这是我的代码(为了可读性添加了换行符):

hadoop jar streaming.jar -files s3n://path/to/a/file.json#file.json
    -D mapred.output.compress=true -D stream.reduce.output.field.separator=
    -input s3n://path/to/some/input/*/* -output hdfs:///path/to/output/dir
    -mapper 'php my_mapper.php' -reducer 'php my_reducer.php'

为了对其他人有帮助,使用上面的提示,我能够实现:

CustomOutputFormat<K, V> extends org.apache.hadoop.mapred.TextOutputFormat<K, V> {....}

将“getRecordWriter”的内置实现中的一行更改为:

String keyValueSeparator = job.get("mapred.textoutputformat.separator", ""); 

代替:

String keyValueSeparator = job.get("mapred.textoutputformat.separator", "\t"); 

将其编译成 Jar 并将其包含到我的 hadoop 流式调用中(通过 hadoop 流式处理上的说明)后,该调用如下所示:

hadoop   jar  /usr/lib/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar     \
-archives 'hdfs:///user/the/path/to/your/jar/onHDFS/theNameOfTheJar.jar' \
-libjars theNameOfTheJar.jar \
-outputformat com.yourcompanyHere.package.path.tojavafile.CustomOutputFormat  \
-file yourMapper.py    -mapper  yourMapper.py     \
-file yourReducer.py   -reducer yourReducer.py    \
-input $yourInputFile    \
-output $yourOutputDirectoryOnHDFS

我还将该 jar 包含在我发出该调用的文件夹中。

它非常适合我的需求(并且它在减速器之后的行尾没有创建任何选项卡)。


更新:根据暗示这确实对其他人有帮助的评论,这是我的 CustomOutputFormat.java 文件的完整源代码:

import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;

public class CustomOutputFormat<K, V> extends TextOutputFormat<K, V> {

    public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name,
        Progressable progress) throws IOException {
    boolean isCompressed = getCompressOutput(job);

    //Channging the default from '\t' to blank
    String keyValueSeparator = job.get("mapred.textoutputformat.separator", ""); // '\t'
    if (!isCompressed) {
        Path file = FileOutputFormat.getTaskOutputPath(job, name);
        FileSystem fs = file.getFileSystem(job);
        FSDataOutputStream fileOut = fs.create(file, progress);
        return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
    } else {
        Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
            GzipCodec.class);
        // create the named codec
        CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
        // build the filename including the extension
        Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension());
        FileSystem fs = file.getFileSystem(job);
        FSDataOutputStream fileOut = fs.create(file, progress);
        return new LineRecordWriter<K, V>(new DataOutputStream(
            codec.createOutputStream(fileOut)), keyValueSeparator);
    }
    }
}

仅供参考:对于您的使用上下文,请务必检查这不会对映射器和化简器之间的 hadoop-streaming 托管交互(就分离键与值而言)产生不利影响。澄清:

  • 根据我的测试 - 如果您的数据的每一行都有一个“选项卡”(每侧都有一些内容),您可以保留内置默认值:流式传输会将第一个选项卡之前的第一个内容解释为你的“键”,以及它后面那一行的所有内容作为你的“值”。因此,它不会看到“空值”,并且不会附加在您的化简器之后显示的选项卡。 (您将看到最终输出根据“键”的值进行排序,流将每行中的“键”解释为每个选项卡之前出现的值。)

  • 相反,如果您的数据中没有选项卡,并且您没有使用上述技巧覆盖默认值,那么您将在运行完成后看到选项卡,上述覆盖将成为修复。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Hadoop 流 - 从减速器输出中删除尾随选项卡 的相关文章

随机推荐

  • 向左浮动div,使所有div的高度等于其行中最高的div?

    我正在尝试一种设计 将一系列同一类的 div 浮动到左侧 我希望 div 适合行 其中每行中的 div 具有相同的高度 以便行之间和设计元素排列之间不会出现破损 有没有办法做到这一点 或者我本质上必须预设每个 div 的高度 我向左浮动是因
  • R 相当于 Python 的 re.findall

    我试图从字符串中获取 RegExp 的所有匹配项 但显然在 R 中这并不容易 或者我忽略了一些东西 说实话 这真的很令人困惑 我发现自己在所有选项中迷失了 str extract str match str match all regexe
  • 如何更改 GNU Make 中的当前目录

    我想将源目录与目标目录分开 看来从 Makefile 更改当前工作目录应该是最简单的解决方案 由于以下缺点 目标的显式路径是不够的 Makefile 中的冗余代码 因为对目标的每个引用都应以变量为前缀 更复杂的命令行来构建特定的中间目标 对
  • Oracle驱动程序内存泄漏-Tomcat

    我们使用的是 tomcat 7 0 33 Spring 3 0 1 和 JPA 使用 tomcat JNDI 数据源 Oracle 10g在后端使用ojdbc6 jar 最新 当我们尝试取消部署应用程序时 一些 Oracle 类似乎正在泄漏
  • 为什么我的 Google 应用引擎的域名为“my-project.df.r.appspot.com”?

    我在 Google Cloud 项目中启用了 Google App Enginemy project App Engine 的 URL 是my project appspot com默认情况下 这很好 然而 我发现有时它会变成my proj
  • 如何判断用户的电子邮件地址是否已使用 Django、allauth、rest-auth 和自定义用户进行验证

    我正在使用 Django 2 0 10 以及 Rest framework rest auth 和 allauth 我有一个自定义用户模型 我已经使用 allauth 视图进行了电子邮件验证 用户注册时会发送验证电子邮件 如果我单击电子邮件
  • 重置继承的WPF样式?

    在我的应用程序的 App xaml 部分中 我有一个ResourceDictionary目标类似的元素DataGridColumnHeader and DataGridCell并对它们应用自定义样式 这些定义是全局的 因为它们不使用 x k
  • AFNetworking 2.0 从失败块中的代码 400 获取 JSON

    我在用着AFHTTPRequestOperationManager for a POST要求 现在我故意输入不正确的信息来处理400错误代码 现在 Web 服务实际上返回了一个JSON并向用户解释他们做错了什么的消息 我非常想得到这个JSO
  • Java集合binarySearch无法正常工作

    我只是尝试使用本机 Java 二进制搜索 希望它总能找到第一个出现的位置 但它并不总是返回第一次出现 我在这里做错了什么 import java util class BinarySearchWithComparator public st
  • 不带参数抛出失败信号

    直接打电话就可以吗throw 如果出现问题 您不知道如何恢复 这个想法是让应用程序因转储而崩溃 因为状态未知 或者你应该总是指定一个参数 从MSDN我只发现如果没有参数它会重新抛出 但不知道如果没有初始异常要重新抛出会发生什么 No thr
  • 当从另一个表中删除行时,如何使 PostgreSQL 将行插入到表中?

    我们有一个应用程序 它将根据用户请求从表中删除一行 我无法更改应用程序代码 但是 我想将一行插入到另一个表 有点像日志日志 中 其中包含来自其他几个表的信息 基于要删除的行的信息 我如何在 PostgreSQL 中实现这一目标 写一个触发函
  • Nuget 找不到更新的依赖项

    我刚刚在 ASP 5 MVC 6 beta8 中创建了一个新项目和一个用于测试的兼容类库 问题出现在我打算用于测试的这个新的 Web 类库 项目中 这是我的project json 的样子 version 1 0 0 description
  • Grails hasOne 与belongsTo

    要在 Grails 中创建一对一关系 我可以这样做 class Person static hasOne address Address 在这种情况下 地址表拥有其个人的密钥 我还可以这样做 class Address static bel
  • 将焦点设置在android中listview的任何项目上

    我有一个列表视图 其中包含文本视图作为其元素 现在我希望在启动应用程序时自动聚焦列表的第一项 当我单击其他视图 例如按钮 时 如何将焦点设置在列表中的任何项目上 设置选择和设置焦点是两个不同的事情 如果您只想将选择设置为某个项目 那么您可以
  • 由于 CORS 限制,无法使用 firebase 进行本地测试

    我当前的用例很简单 我只需要向我本地开发的云函数发出post请求 问题是 当我开火时 firebase serve 托管部署在本地主机 5000 并且云功能部署在本地主机 5001 由于端口不同 这两者来自不同的来源 因此 当浏览器发送初始
  • 如何处理对数图中的零

    问题 我想使用 ggplot2 将数据绘制在 y 轴上具有对数刻度的折线图中 不幸的是 我的一些价值观一路下降到零 数据表示依赖于某些参数的特征的相对出现 当在样本中没有观察到该特征时 值为零 这意味着它很少出现 或者实际上从未出现 这些零
  • Android游戏RPG库存系统

    我使用 ArrayList 作为我的 库存 我无法找到一种方法来添加多个相同的物品而不占用 库存 中的位置 例如 我在库存中添加了一瓶药水 现在我添加了另一种药水 但这次不是在库存中添加另一种药水 而是应该显示我有 药水 x 2 同时只占用
  • 获取 Urllib2.Request 的请求标头?

    有没有办法从使用 Urllib2 创建的请求中获取标头或确认使用 urllib2 urlopen 发送的 HTTP 标头 查看请求 和响应标头 的一种简单方法是启用调试输出 opener urllib2 build opener urlli
  • llvm JIT 将库添加到模块

    我正在开发一个使用 LLVM 的 JIT 该语言有一个用 C 编写的小型运行时 我使用 clang 将其编译为 LLVM IR clang runtime cu cuda gpu arch sm 50 c emit llvm 然后加载 bc
  • Hadoop 流 - 从减速器输出中删除尾随选项卡

    我有一个 hadoop 流作业 其输出不包含键 值对 您可以将其视为仅值对或仅键对 我的流式减速器 一个 php 脚本 正在输出由换行符分隔的记录 Hadoop 流处理将此视为没有值的键 并在换行符之前插入一个制表符 这个额外的选项卡是不需