使用 XMLInputFormat 在 hadoop 中解析 xml 时不执行我的 hadoop 映射器类

2024-02-05

我是 hadoop 新手,使用 Hadoop 2.6.0 版本并尝试解析复杂的 XML。 经过一段时间的搜索,我了解到,对于 XML 解析,我们需要编写自定义的 InputFormat,即 mahout 的 XMLInputFormat。 我也得到了帮助这个例子 http://xmlandhadoop.blogspot.in/

但是,当我在传递 XMLInputformat 类之后运行代码时,如果我使用示例中给出的 XMLInputFormat,它不会调用我自己的 Mapper 类,并且输出文件中的数据为 0。

令人惊讶的是,如果我不将 XMLInputFormat 类传递给我的作业,那么我的映射器可以正常工作并正确提供输出。有人会在这里帮忙指出我在这里缺少什么吗?

我的工作配置类是:

public static void runParserJob(String inputPath, String outputPath) throws IOException {
    LOGGER.info("-----runParserJob()-----Start");
    Configuration configuration = new Configuration();         configuration.set("xmlinput.start",Constants.XML_INPUT_START_TAG_PRODUCT);
    configuration.set("xmlinput.end",Constants.XML_INPUT_END_TAG_PRODUCT);
    configuration.set("io.serializations",Constants.XML_IO_SERIALIZATIONS);

    Job job = new Job(configuration,Constants.JOB_TITLE);
    FileInputFormat.setInputPaths(job, inputPath);
    job.setJarByClass(ParserDriver.class);
    job.setMapperClass(XMLMapper.class);
    job.setNumReduceTasks(0);
    job.setInputFormatClass(XmlInputFormat.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);
    Path hdfsOutputPath = new Path(outputPath);
    FileOutputFormat.setOutputPath(job, hdfsOutputPath);
    FileSystem dfs = FileSystem.get(hdfsOutputPath.toUri(),configuration);
    /**Using this condition it will create output at same location 
     * by deleting older data in that location**/
    if(dfs.exists(hdfsOutputPath)){
        dfs.delete(hdfsOutputPath,true);
    }
    try{
        job.waitForCompletion(true);
    }catch(InterruptedException ie){
        LOGGER.error("-----Process interrupted in between Exception-----", ie);
    }catch(ClassNotFoundException ce){
        LOGGER.error("-----Class not found while running the job-----",ce);
    }
}

我的 XMLInputFormat 类是:

public class XmlInputFormat extends TextInputFormat{

public static final String START_TAG_KEY = "xmlinput.start";
public static final String END_TAG_KEY = "xmlinput.end";

@Override
public RecordReader<LongWritable,Text> createRecordReader(InputSplit is, TaskAttemptContext tac)  {
    return new XmlRecordReader();
}

public static class XmlRecordReader extends RecordReader<LongWritable, Text>{
    private byte[] startTag;
    private byte[] endTag;
    private long start;
    private long end;
    private FSDataInputStream fsin;
    private DataOutputBuffer buffer = new DataOutputBuffer();
    private LongWritable key = new LongWritable();
    private Text value = new Text();

    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
            throws IOException, InterruptedException {
        FileSplit fileSplit = (FileSplit)inputSplit;
        startTag = taskAttemptContext.getConfiguration().get(START_TAG_KEY).getBytes("utf-8");
        endTag = taskAttemptContext.getConfiguration().get(END_TAG_KEY).getBytes("utf-8");

        start = fileSplit.getStart();
        end = start + fileSplit.getLength();
        Path file = fileSplit.getPath();

        FileSystem hdfs = file.getFileSystem(taskAttemptContext.getConfiguration());
         fsin = hdfs.open(fileSplit.getPath());
         fsin.seek(start);
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if(fsin.getPos() < end){
            if(readUntilMatch(startTag,false)){
              try {
                    buffer.write(startTag);
                    if (readUntilMatch(endTag, true)) {
                        value.set(buffer.getData(), 0, buffer.getLength());
                        key.set(fsin.getPos());
                        return true;
                    }
                  } finally {
                    buffer.reset();
                  }
            }
        }
        return false;
    }

    @Override
    public void close() throws IOException {

    }

    @Override
    public LongWritable getCurrentKey() throws IOException,InterruptedException {
        return null;
    }

    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return null;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException{
        int i = 0;
        while(true){
            int b = fsin.read();
            //If reaches to EOF
            if(b == -1){
                return false;
            }   
            //If not then save into the buffer.
            if(withinBlock){
                buffer.write(b);
            }
            // check if we're matching:
            if (b == match[i]) {
              i++;
              if (i >= match.length) return true;
            } else i = 0;
            // see if we've passed the stop point:
            if (!withinBlock && i == 0 && fsin.getPos() >= end) return false;
        }
    }

}

}


有人可以帮我从这里出去吗?提前致谢。如果我哪里出错了请纠正我。


我不确定您的 XML 结构是什么样子,但例如,如果您有一个 XML 结构:

<data>
   <product id="101" itemCategory="BER" transaction="PUR">
       <transaction-id>102A5RET</transaction-id>
       <item-name>Blue-Moon-12-PK-BTTLE</item-name>
       <item-purchased>2</item-purchased>
       <item-price>12.99</item-price>
       <time-stamp>2015-04-20 11:12:13 102301</time-stamp>
   </product>
   .
   .
   .
</data>

您的 XMLInputFormat 类需要知道您想要使用哪个 XML 节点:

configuration.set("xmlinput.start", "<product") //note only <product
configuration.set("xmlinput.end", "</product>") //note only </product>

希望这会有所帮助!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 XMLInputFormat 在 hadoop 中解析 xml 时不执行我的 hadoop 映射器类 的相关文章

  • Spring Batch 多线程 - 如何使每个线程读取唯一的记录?

    这个问题在很多论坛上都被问过很多次了 但我没有看到适合我的答案 我正在尝试在我的 Spring Batch 实现中实现多线程步骤 有一个包含 100k 条记录的临时表 想要在 10 个线程中处理它 每个线程的提交间隔为 300 因此在任何时
  • JAXb、Hibernate 和 beans

    目前我正在开发一个使用 Spring Web 服务 hibernate 和 JAXb 的项目 1 我已经使用IDE hibernate代码生成 生成了hibernate bean 2 另外 我已经使用maven编译器生成了jaxb bean
  • Liferay ClassNotFoundException:DLFileEntryImpl

    在我的 6 1 0 Portal 实例上 带有使用 ServiceBuilder 和 DL Api 的 6 1 0 SDK Portlet 这一行 DynamicQuery query DynamicQueryFactoryUtil for
  • Mockito when().thenReturn 不必要地调用该方法

    我正在研究继承的代码 我编写了一个应该捕获 NullPointerException 的测试 因为它试图从 null 对象调用方法 Test expected NullPointerException class public void c
  • 如何将 pfx 文件转换为 jks,然后通过使用 wsdl 生成的类来使用它来签署传出的肥皂请求

    我正在寻找一个代码示例 该示例演示如何使用 PFX 证书通过 SSL 访问安全 Web 服务 我有证书及其密码 我首先使用下面提到的命令创建一个 KeyStore 实例 keytool importkeystore destkeystore
  • 总是使用 Final?

    我读过 将某些东西做成最终的 然后在循环中使用它会带来更好的性能 但这对一切都有好处吗 我有很多地方没有循环 但我将 Final 添加到局部变量中 它会使速度变慢还是仍然很好 还有一些地方我有一个全局变量final 例如android Pa
  • 如何在控制器、服务和存储库模式中使用 DTO

    我正在遵循控制器 服务和存储库模式 我只是想知道 DTO 在哪里出现 控制器应该只接收 DTO 吗 我的理解是您不希望外界了解底层域模型 从领域模型到 DTO 的转换应该发生在控制器层还是服务层 在今天使用 Spring MVC 和交互式
  • AWS 无法从 START_OBJECT 中反序列化 java.lang.String 实例

    我创建了一个 Lambda 函数 我想在 API 网关的帮助下通过 URL 访问它 我已经把一切都设置好了 我还创建了一个application jsonAPI Gateway 中的正文映射模板如下所示 input input params
  • 在 Mac 上正确运行基于 SWT 的跨平台 jar

    我一直致力于一个基于 SWT 的项目 该项目旨在部署为 Java Web Start 从而可以在多个平台上使用 到目前为止 我已经成功解决了由于 SWT 依赖的系统特定库而出现的导出问题 请参阅相关thread https stackove
  • Eclipse Java 远程调试器通过 VPN 速度极慢

    我有时被迫离开办公室工作 这意味着我需要通过 VPN 进入我的实验室 我注意到在这种情况下使用 Eclipse 进行远程调试速度非常慢 速度慢到调试器需要 5 7 分钟才能连接到远程 jvm 连接后 每次单步执行断点 行可能需要 20 30
  • Android 中麦克风的后台访问

    是否可以通过 Android 手机上的后台应用程序 服务 持续监控麦克风 我想做的一些想法 不断聆听背景中的声音信号 收到 有趣的 音频信号后 执行一些网络操作 如果前台应用程序需要的话 后台应用程序必须能够智能地放弃对麦克风的访问 除非可
  • 如何从指定日期获取上周五的日期? [复制]

    这个问题在这里已经有答案了 如何找出上一个 上一个 星期五 或指定日期的任何其他日期的日期 public getDateOnDay Date date String dayName 我不会给出答案 先自己尝试一下 但是 也许这些提示可以帮助
  • 在mockito中使用when进行模拟ContextLoader.getCurrentWebApplicationContext()调用。我该怎么做?

    我试图在使用 mockito 时模拟 ContextLoader getCurrentWebApplicationContext 调用 但它无法模拟 here is my source code Mock org springframewo
  • Java列表的线程安全

    我有一个列表 它将在线程安全上下文或非线程安全上下文中使用 究竟会是哪一个 无法提前确定 在这种特殊情况下 每当列表进入非线程安全上下文时 我都会使用它来包装它 Collections synchronizedList 但如果不进入非线程安
  • 如何在桌面浏览器上使用 webdriver 移动网络

    我正在使用 selenium webdriver 进行 AUT 被测应用程序 的功能测试自动化 AUT 是响应式网络 我几乎完成了桌面浏览器的不同测试用例 现在 相同的测试用例也适用于移动浏览器 因为可以从移动浏览器访问 AUT 由于它是响
  • 声明的包“”与预期的包不匹配

    我可以编译并运行我的代码 但 VSCode 中始终显示错误 早些时候有一个弹出窗口 我不记得是什么了 我点击了 全局应用 从那以后一直是这样 Output is there but so is the error The declared
  • 静态变量的线程安全

    class ABC implements Runnable private static int a private static int b public void run 我有一个如上所述的 Java 类 我有这个类的多个线程 在里面r
  • 如何修复 JNLP 应用程序中的“缺少代码库、权限和应用程序名称清单属性”?

    随着最近的 Java 更新 许多人都遇到了缺少 Java Web Start 应用程序的问题Codebase Permissions and Application name体现属性 尽管有资源可以帮助您完成此任务 但我找不到任何资源综合的
  • 节拍匹配算法

    我最近开始尝试创建一个移动应用程序 iOS Android 它将自动击败比赛 http en wikipedia org wiki Beatmatching http en wikipedia org wiki Beatmatching 两
  • Spring Boot @ConfigurationProperties 不从环境中检索属性

    我正在使用 Spring Boot 1 2 1 并尝试创建一个 ConfigurationProperties带有验证的bean 如下所示 package com sampleapp import java net URL import j

随机推荐