使用 XMLInputFormat 在 hadoop 中解析 xml 时不执行我的 hadoop 映射器类


我是 hadoop 新手,使用 Hadoop 2.6.0 版本并尝试解析复杂的 XML。 经过一段时间的搜索,我了解到,对于 XML 解析,我们需要编写自定义的 InputFormat,即 mahout 的 XMLInputFormat。 我也得到了帮助这个例子 http://xmlandhadoop.blogspot.in/

但是,当我在传递 XMLInputformat 类之后运行代码时,如果我使用示例中给出的 XMLInputFormat,它不会调用我自己的 Mapper 类,并且输出文件中的数据为 0。

令人惊讶的是,如果我不将 XMLInputFormat 类传递给我的作业,那么我的映射器可以正常工作并正确提供输出。有人会在这里帮忙指出我在这里缺少什么吗?


public static void runParserJob(String inputPath, String outputPath) throws IOException {
    Configuration configuration = new Configuration();         configuration.set("xmlinput.start",Constants.XML_INPUT_START_TAG_PRODUCT);

    Job job = new Job(configuration,Constants.JOB_TITLE);
    FileInputFormat.setInputPaths(job, inputPath);
    Path hdfsOutputPath = new Path(outputPath);
    FileOutputFormat.setOutputPath(job, hdfsOutputPath);
    FileSystem dfs = FileSystem.get(hdfsOutputPath.toUri(),configuration);
    /**Using this condition it will create output at same location 
     * by deleting older data in that location**/
    }catch(InterruptedException ie){
        LOGGER.error("-----Process interrupted in between Exception-----", ie);
    }catch(ClassNotFoundException ce){
        LOGGER.error("-----Class not found while running the job-----",ce);

我的 XMLInputFormat 类是:

public class XmlInputFormat extends TextInputFormat{

public static final String START_TAG_KEY = "xmlinput.start";
public static final String END_TAG_KEY = "xmlinput.end";

public RecordReader<LongWritable,Text> createRecordReader(InputSplit is, TaskAttemptContext tac)  {
    return new XmlRecordReader();

public static class XmlRecordReader extends RecordReader<LongWritable, Text>{
    private byte[] startTag;
    private byte[] endTag;
    private long start;
    private long end;
    private FSDataInputStream fsin;
    private DataOutputBuffer buffer = new DataOutputBuffer();
    private LongWritable key = new LongWritable();
    private Text value = new Text();

    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
            throws IOException, InterruptedException {
        FileSplit fileSplit = (FileSplit)inputSplit;
        startTag = taskAttemptContext.getConfiguration().get(START_TAG_KEY).getBytes("utf-8");
        endTag = taskAttemptContext.getConfiguration().get(END_TAG_KEY).getBytes("utf-8");

        start = fileSplit.getStart();
        end = start + fileSplit.getLength();
        Path file = fileSplit.getPath();

        FileSystem hdfs = file.getFileSystem(taskAttemptContext.getConfiguration());
         fsin = hdfs.open(fileSplit.getPath());

    public boolean nextKeyValue() throws IOException, InterruptedException {
        if(fsin.getPos() < end){
              try {
                    if (readUntilMatch(endTag, true)) {
                        value.set(buffer.getData(), 0, buffer.getLength());
                        return true;
                  } finally {
        return false;

    public void close() throws IOException {


    public LongWritable getCurrentKey() throws IOException,InterruptedException {
        return null;

    public Text getCurrentValue() throws IOException, InterruptedException {
        return null;

    public float getProgress() throws IOException, InterruptedException {
        return 0;

    private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException{
        int i = 0;
            int b = fsin.read();
            //If reaches to EOF
            if(b == -1){
                return false;
            //If not then save into the buffer.
            // check if we're matching:
            if (b == match[i]) {
              if (i >= match.length) return true;
            } else i = 0;
            // see if we've passed the stop point:
            if (!withinBlock && i == 0 && fsin.getPos() >= end) return false;




我不确定您的 XML 结构是什么样子,但例如,如果您有一个 XML 结构:

   <product id="101" itemCategory="BER" transaction="PUR">
       <time-stamp>2015-04-20 11:12:13 102301</time-stamp>

您的 XMLInputFormat 类需要知道您想要使用哪个 XML 节点:

configuration.set("xmlinput.start", "<product") //note only <product
configuration.set("xmlinput.end", "</product>") //note only </product>



