第一次使用Hadoop,MapReduce Job不运行Reduce Phase

2023-12-11

我编写了一个简单的映射缩减作业,该作业将从 DFS 读取数据并对其运行简单的算法。当尝试调试它时,我决定简单地让映射器输出一组键和值,而减速器输出一组完全不同的键和值。我在单节点 Hadoop 20.2 集群上运行此作业。当作业完成时,输出仅包含映射器输出的值,这让我相信减速器没有运行。如果有人能够解释为什么我的代码会产生这样的输出,我将不胜感激。我尝试将outputKeyClass和outputValueClass设置为不同的东西,以及将setMapOutputKeyClass和setMapOutputValueClass设置为不同的东西。目前,注释的代码部分是我正在运行的算法,但我已经更改了映射和归约方法以简单地输出某些值。同样,作业的输出仅包含映射器输出的值。这是我用来运行该作业的类:

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class CalculateHistogram {

    public static class HistogramMap extends Mapper<LongWritable, Text, LongWritable, Text> {

        private static final int R = 100;
        private int n = 0;

        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            if (n == 0) {
                StringTokenizer tokens = new StringTokenizer(value.toString(), ",");
                int counter = 0;
                while (tokens.hasMoreTokens()) {
                    String token = tokens.nextToken();
                    if (tokens.hasMoreTokens()) {
                        context.write(new LongWritable(-2), new Text("HI"));
                        //context.write(new LongWritable(counter), new Text(token));
                    }
                    counter++;
                    n++;
                }
            } else {
                n++;
                if (n == R) {
                    n = 0;
                }
                
            }
        }
    }

    public static class HistogramReduce extends Reducer<LongWritable, Text, LongWritable, HistogramBucket> {

        private final static int R = 10;

        public void reduce(LongWritable key, Iterator<Text> values, Context context)
                                            throws IOException, InterruptedException {
            if (key.toString().equals("-1")) {
                //context.write(key, new HistogramBucket(key));
            }
            Text t = values.next();
            for (char c : t.toString().toCharArray()) {
                if (!Character.isDigit(c) && c != '.') {
                    //context.write(key, new HistogramBucket(key));//if this isnt a numerical attribute we ignore it
                }
            }
            context.setStatus("Building Histogram");
            HistogramBucket i = new HistogramBucket(key);
            i.add(new DoubleWritable(Double.parseDouble(t.toString())));
            while (values.hasNext()) {
                for (int j = 0; j < R; j++) {
                    t = values.next();
                }
                if (!i.contains(Double.parseDouble(t.toString()))) {
                    context.setStatus("Writing a value to the Histogram");
                    i.add(new DoubleWritable(Double.parseDouble(t.toString())));
                }
            }
            
            context.write(new LongWritable(55555555), new HistogramBucket(new LongWritable(55555555)));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }

        Job job = new Job(conf, "MRDT - Generate Histogram");
        job.setJarByClass(CalculateHistogram.class);
        job.setMapperClass(HistogramMap.class);
        job.setReducerClass(HistogramReduce.class);

        //job.setOutputValueClass(HistogramBucket.class);
        
        //job.setMapOutputKeyClass(LongWritable.class);
        //job.setMapOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

你的reduce方法的签名是错误的。您的方法签名包含Iterator<Text>。你必须通过一个Iterable<Text>.

您的代码不会覆盖reduce的方法Reducer基类。因此,由Reducer使用基类。这个实现是一个恒等函数。

Use the @Override注释来预测像这样的错误。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

第一次使用Hadoop,MapReduce Job不运行Reduce Phase 的相关文章

随机推荐