TFIDF算法Hadoop实现

2023-11-13

程序说明:利用MapReduce计算框架,计算一组英文文档中各个单词的TFIDF。某单词在某文档的TFIDF=该单词中该文档的TF×该单词IDF。其中,
  - TF(i,j):单词i在文档j中出现的频率(Term Frequency)。TF(i,j)=N(i,j)/N(j),N(i,j)是单词i中文档j中出现的次数,N(j)是文档j的单词总数。
  - IDF(i):单词i的逆文件频率(Inverse Document Frequency)。IDF(i)=LOG((M+1)/M(i)),M是文件总数,M(i)是包含单词i的文件数。M+1,为了避免M与M(i)相等,导致对数为零的情况。

 程序结构:程序由3个Job,按照chain mapreduce进行计算。其中,
  - Job1:包括Tfmapper、Tfreducer、Tfgroup,计算单词在文档的TF。
  - Job2:包括Idfmapper、Idfreducer、Idfsort,计算单词的IDF。
  - Job3:包括Tfidf_tfmapper、Tfidf_idfmapper、Tfidfreducer、Tfidfsort、Tfidfgroup,计算单词的TFIDF。
 
 代码参考:
   - http://blog.csdn.net/jackydai987/article/details/6303459

   - blog.csdn.net/ididcan/article/details/6657977


 TFIDF源代码内容:

1)编写Docword类,把相关单词、对应文件名、相关指标(如IDF、TFIDF等等)作为属性。

<span style="font-size:18px;">package tfidf;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class Docword implements WritableComparable<Docword> {
    String docname;
    String word;
    double index;
    static final String DELIMITER=",";
    public Docword() {
        super();
        // TODO Auto-generated constructor stub
    }

    public String getDocname() {
        return docname;
    }

    public void setDocname(String docname) {
        this.docname = docname;
    }

    public String getWord() {
        return word;
    }

    public void setWord(String word) {
        this.word = word;
    }

    public double getIndex() {
        return index;
    }

    public void setIndex(double index) {
        this.index = index;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        out.writeUTF(docname);
        out.writeUTF(word);
        out.writeDouble(index);
    }

    @Override
    public String toString() {
        return docname+DELIMITER+word;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        docname=in.readUTF();
        word=in.readUTF();
        index=in.readDouble();
    }

    @Override
    public int compareTo(Docword arg0) {
        // TODO Auto-generated method stub
        int result=0;
        result=(result!=0)?result:docname.compareTo(arg0.getDocname());
        result=(result!=0)?result:word.compareTo(arg0.getWord());
        return result;
    }

}</span>



2)TFIDF代码。已有注释,具体如下。

<span style="font-size:18px;">public class Tfidf extends Configured implements Tool {

    static final String DELIMITER=",";

    /* =======================Job1:Tfidf-tf=======================
     * 程序说明:计算文档中各单词TF。程序主要思路借鉴了MapReduce Tutorial的WordCount2.0代码,
     * - 利用Local Resource,将待删除的字符串(如逗号、句号等)读入内存,对split中每个record删除特定字符串。
     * - 设置”全局“变量(Map类的属性),统计文档的单词总数。注意,文档可能由于超过split大小,需要被分为多个split处理。所以Map中”全局“变量仅统计该split中各文档单词数,实际数需要中reduce中按照filename进行合并。
     *
     * 程序输入:HDFS某目录下所有Text英文文档。
     * 程序输出:各文档中单词的TF,按照“filename, wordname, termfreq”格式输出。
     * (Shuffle)排序规则:1)filename。2)wordname。
     * (Shuffle)合并规则:1)filename。2)wordname。
     */

    public static class Tfmapper extends
            Mapper<LongWritable, Text, Docword, IntWritable> {
        Docword outputKey=new Docword();
        IntWritable outputValue=new IntWritable(1);//拆分每个单词,按频次为1,在reduce中分文件名进行合并
        HashSet<String> skipstr=new HashSet<String>(); //存储待删除的字符串
        int termnumindoc=0;//每个split中各文档中单词总数。

        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            // TODO Auto-generated method stub
            FileSplit filesplit=(FileSplit)context.getInputSplit();
            String filename=filesplit.getPath().getName();
            outputKey.setDocname(filename);
            outputKey.setWord("!termnum");//!termnum标识该记录是文件总单词数,由于map中已经对记录删除”!“,所以不会重复。
            context.write(outputKey, new IntWritable(termnumindoc));//按照”filename,!termnum,termnumindoc“格式输出。由于!的ASCII码中所有字母之前,按照Docword.compareTo(),经过shuffle后,!termnum记录会出现中该filename所有记录的第一位。
        }
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            String str=value.toString().toLowerCase();
            for(String skip:skipstr){
                str=str.replaceAll(skip, "");//对每个redcord,删除特定字符串
            }
            String []words=StringUtils.split(str, ' ');
            FileSplit filesplit=(FileSplit)context.getInputSplit();//获取InputSplit。每个InputSplit只属于一个InputFile,每个InputFile至少有一个InputSplit。
            String filename=filesplit.getPath().getName();//利用FileSplit,提取输入文件名。

            for(String word:words){
                if(word.trim().isEmpty())//删除空行和空字符串。也可以放在待删除字符串,用正则表达式实现。
                    continue;
                if(word.charAt(0)<'a' || word.charAt(0)>'z')//删除非字母开头的单词。也可以放在待删除字符串,用正则表达式实现。
                    continue;
                termnumindoc++;//文档的单词数加1.
                outputKey.setDocname(filename);//文件名
                outputKey.setWord(word);//单词名
                context.write(outputKey, outputValue);//按照”filename,wordname,1“格式输出到本地文件系统,进行shuffle
            }
        }
        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            // TODO Auto-generated method stub
            String line="";
            BufferedReader fs=new BufferedReader(new FileReader("skipstr"));//读入待剔除的字符串
            while((line=fs.readLine())!=null){
                skipstr.add(line);//字符串逐个加入内存中的HashSet
            }
        }

    }
    public static class Tfcombiner extends
            Reducer<Docword, IntWritable, Docword, IntWritable> {
        IntWritable outputValue=new IntWritable();
        @Override
        protected void reduce(Docword key, Iterable<IntWritable> values,
                Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            int sumall=0;
            for(IntWritable value:values){
                sumall+=value.get();
            }
            outputValue.set(sumall);
            context.write(key, outputValue);
        }

    }
    public static class Tfreducer extends
            Reducer<Docword, IntWritable, Docword, DoubleWritable> {
        DoubleWritable outputValue=new DoubleWritable();
        int termnumindoc=0;//各文档中单词总数。由于文档可能超过split尺寸大小,被拆分在多个split被多个Map统计单词数。在Reduce中对各个Map统计的单词总数进行汇总。
        @Override
        protected void reduce(Docword key, Iterable<IntWritable> values,
                Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            int sumall=0;
            for(IntWritable value:values){
                sumall+=value.get();
            }
            if(key.getWord().equals("!termnum")){//单词名是!termnum,标记该记录是文件的单词总数。
                termnumindoc=sumall;
            }
            else{//单词名不是!termnum,标记该记录是文件实际单词。
                // 由于!的ASCII码中所有字母之前,按照Docword.compareTo(),经过shuffle后,!termnum记录会出现中该filename所有记录的第一位。
                // 计算TF时,分母”文件中单词总数“termnumindoc已有数据。
                outputValue.set((double)1*sumall/termnumindoc);
                context.write(key, outputValue);//按照”filename,wordname,termfreq“输出到HDFS,作为Job2的输入
            }
        }

    }

    public static class Tfpartition extends Partitioner<Docword, IntWritable> {

        @Override
        public int getPartition(Docword key, IntWritable value,
                int numPartitions) {
            // TODO Auto-generated method stub
            return Math.abs((key.getDocname()).hashCode())%numPartitions;
        }

    }

    /* =======================Job2:Tfidf-idf=======================
     * 程序说明:计算各单词idf。程序读入Job1的输出数据,按照"wordname,filename"格式,对各个wordname进行合并,计算word的IDF。
     * - 在run()中,利用HDFS API获取文件总数,并传递给Reduce。
     * - 由于Job1的输出数据按照filename进行排序,顺序读入记录并比较前后两个记录的filename有没有改变。如果没有改变,则文件总数不变;如果有改变,则文件总数加1。
     *
     * 程序输入:Job1的输出文件,格式为“filename, wordname, termfreq”
     * 程序输出:单词的Idf,格式为“wordname, idf”。
     * (Shuffle)排序规则:wordname。
     * (Shuffle)合并规则:wordname。
     */

    public static class Idfmapper extends
            Mapper<LongWritable, Text, Docword, IntWritable> {
        Docword outputKey=new Docword();
        IntWritable outputValue=new IntWritable(1);//拆分单词,按频次为1,在reduce中分文件名进行合并
        int filenum=0;
        String filename="";//文件名。

        @Override
        protected void map(LongWritable key, Text value,
                Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            String []words=StringUtils.split(value.toString(), ',');
            outputKey.setDocname(words[0]);
            outputKey.setWord(words[1]);
            context.write(outputKey, outputValue);
        }

    }
    public static class Idfcombiner extends
            Reducer<Docword, IntWritable, Docword, IntWritable> {
        IntWritable outputValue=new IntWritable();

        @Override
        protected void reduce(Docword key, Iterable<IntWritable> values,
                Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            int sumall=0;
            for(IntWritable value:values){
                sumall+=value.get();
            }
            outputValue.set(sumall);
            context.write(key, outputValue);
        }

    }
    public static class Idfreducer extends
                Reducer<Docword, IntWritable, Text, DoubleWritable> {
            DoubleWritable outputValue=new DoubleWritable();
            Text outputKey=new Text();
            int alldoc=0;//文件总数。

            @Override
            protected void setup(Context context) throws IOException,
                    InterruptedException {
                // TODO Auto-generated method stub
                //将run()中传递的变量读入内存,获得文件总数。
                alldoc=Integer.parseInt(context.getConfiguration().get("filesnum"));
            }

            @Override
            protected void reduce(Docword key, Iterable<IntWritable> values,
                    Context context)
                    throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                //由于Job1的输出数据按照filename进行排序,顺序读入记录并比较前后两个记录的filename有没有改变。
                // 如果没有改变,则文件总数不变;如果有改变,则文件总数加1。
                int termdocnum=0;
                for(IntWritable value:values){
                        termdocnum+=value.get();//单词对应文件数加1。
                }
                outputKey.set(key.getWord());
                outputValue.set((double)Math.log((double)(alldoc+1)/termdocnum));
                context.write(outputKey, outputValue);//输出idf计算结果,输出格式“wordname,idf”
            }
        }
    public static class Idfpartition extends Partitioner<Docword, IntWritable> {

        @Override
        public int getPartition(Docword key, IntWritable value,
                int numPartitions) {
            // TODO Auto-generated method stub
            return Math.abs((key.getWord().hashCode()))%numPartitions;
        }

    }
    public static class Idfsort extends WritableComparator {
        //在shuffle中,所有记录按照wordname进行排序,按照wordname进行合并。
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            // TODO Auto-generated method stub
            Docword lhs=(Docword)a;
            Docword rhs=(Docword)b;
            return lhs.getWord().compareTo(rhs.getWord());
        }

        public Idfsort() {
            super(Docword.class,true);
            // TODO Auto-generated constructor stub
        }
    }

    /* =======================Job3:Tfidf-tfidf=======================
     * 程序说明:计算各单词tfidf。程序利用MultipleInputs分别读入Job1、Job2的输出数据,类似ReduceSide Join在Reduce中进行汇总计算。
     * - MultipleInputs:配置Map,分别读入Job1、Job2的输出数据。其中,读入job2的输入数据后,设置filename为“!alldoc”。由于!的ASCII码小于所有字母,所以同一个word的job2记录在shuffle中排在job1记录前面。
     * - Reduce:设置sortComparator,按照wordname(高优先级)、filename(低优先级)进行排序;设置groupComparator,按照wordname进行合并记录。
     *
     * 程序输入:Job1的输出文件,格式为“filename, wordname, termfreq”;Job2的输出文件,格式为“wordname,idf”。
     * 程序输出:单词的tfIdf,格式为“filename,wordname, idf”。
     * (Shuffle)排序规则:1)wordname。2)filename。
     * (Shuffle)合并规则:wordname。
     */
    public static class Tfidf_tfmapper extends
            Mapper<LongWritable, Text, Docword, Docword> {
        Docword outputKey=new Docword();
        Docword outputValue=new Docword();
        @Override
        protected void map(LongWritable key, Text value,
                Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            String []words=StringUtils.split(value.toString(), ',');
            outputKey.setWord(words[1]);
            outputKey.setDocname(words[0]);
            outputValue.setDocname(words[0]);
            outputValue.setWord(words[1]);
            outputValue.setIndex(Double.parseDouble(words[2]));
            context.write(outputKey, outputValue);//读入Job1的输出文件,格式为“filename, wordname, termfreq”
        }
    }

    public static class Tfidf_idfmapper extends
            Mapper<LongWritable, Text, Docword, Docword> {
        Docword outputValue=new Docword();
        Docword outputKey=new Docword();
        @Override
        protected void map(LongWritable key, Text value,
                Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            String []words=StringUtils.split(value.toString(), ',');
            outputValue.setDocname("!alldoc");
            outputValue.setWord(words[0]);
            outputValue.setIndex(Double.parseDouble(words[1]));
            outputKey.setWord(words[0]);
            outputKey.setDocname("!alldoc");
            context.write(outputKey, outputValue);//读入Job2的输出文件,格式为“wordname,idf”。
        }
    }

    public static class Tfidfreducer extends
            Reducer<Docword, Docword, Text, DoubleWritable> {
        Text outputKey=new Text();
        DoubleWritable outputValue=new DoubleWritable();
        @Override
        protected void reduce(Docword key, Iterable<Docword> values,
                Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            double termidf=0.0,termfq=0.0;
            for(Docword value:values){
                //读入job2的输入数据后,设置filename为“!alldoc”。由于!的ASCII码小于所有字母,所以同一个word的job2记录在shuffle中排在job1记录前面。
                if(value.getDocname().equals("!alldoc")){
                    termidf=value.getIndex();
                }else{
                    termfq=value.getIndex();
                    outputKey.set(value.getDocname()+","+value.getWord());
                    outputValue.set(termidf*termfq);
                    context.write(outputKey, outputValue);
                }
            }
        }
    }
    public static class Tfidfsort extends WritableComparator {

        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            // TODO Auto-generated method stub
            Docword lhs=(Docword)a;
            Docword rhs=(Docword)b;
            int result=0;
            result=(result!=0)?result:lhs.getWord().compareTo(rhs.getWord());
            result=(result!=0)?result:lhs.getDocname().compareTo(rhs.getDocname());
            return result;//按照wordname、filename进行排序。
        }

        public Tfidfsort() {
            super(Docword.class,true);
            // TODO Auto-generated constructor stub
        }

    }
    public static class Tfidfpartition extends Partitioner<Docword, Docword> {

        @Override
        public int getPartition(Docword key, Docword value,
                int numPartitions) {
            // TODO Auto-generated method stub
            return Math.abs((key.getWord().hashCode()))%numPartitions;
        }

    }
    public static class Tfidfgroup extends WritableComparator {

        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            // TODO Auto-generated method stub
            Docword lhs=(Docword)a;
            Docword rhs=(Docword)b;
            return lhs.getWord().compareTo(rhs.getWord());//按照wordname进行合并。
        }

        public Tfidfgroup() {
            super(Docword.class,true);
            // TODO Auto-generated constructor stub
        }

    }
    public int run(String []args) throws Exception{
        Path in1=new Path("data/wordcount");//输入文件路径
        Path out1=new Path("output/tfidf-tf");//Job1的tf结果输出路径
        Path out2=new Path("output/tfidf-idf");//Job2的idf结果输出路径
        Path out3=new Path("output/tfidf-tfidf");//Job3的tfidf结果输出路径
        URI skipstr=new URI("data/skipstr");//Job1的待删除字符串路径

        //============Job1配置============
        Job job1=Job.getInstance(getConf(), "tfidf-tf");
        Configuration conf1=job1.getConfiguration();
        job1.setJarByClass(getClass());

        FileInputFormat.setInputPaths(job1, in1);

        out1.getFileSystem(conf1).delete(out1, true);
        FileOutputFormat.setOutputPath(job1, out1);
        conf1.set(TextOutputFormat.SEPERATOR, DELIMITER);

        job1.setInputFormatClass(TextInputFormat.class);
        job1.setOutputFormatClass(TextOutputFormat.class);

        job1.setMapperClass(Tfmapper.class);
        job1.setMapOutputKeyClass(Docword.class);
        job1.setMapOutputValueClass(IntWritable.class);
        job1.setCombinerClass(Tfcombiner.class);
        job1.setReducerClass(Tfreducer.class);
        job1.setOutputKeyClass(Docword.class);
        job1.setOutputValueClass(DoubleWritable.class);
        job1.setPartitionerClass(Tfpartition.class);
        job1.addCacheFile(skipstr);
        job1.setNumReduceTasks(3);

        if(job1.waitForCompletion(true)==false)
            return 1;

        //============Job2配置============
        Job job2=Job.getInstance(getConf(), "tfidf-idf");
        Configuration conf2=job2.getConfiguration();
        job2.setJarByClass(getClass());

        FileInputFormat.setInputPaths(job2, out1);
        out2.getFileSystem(conf2).delete(out2, true);

        //利用HDFS API接口,获得输入文件总数,并通过变量filesum传递给Job2。
        FileSystem hdfs=FileSystem.get(conf2);
        FileStatus p[]=hdfs.listStatus(in1);
        conf2.set("filesnum", Integer.toString(p.length));

        FileOutputFormat.setOutputPath(job2, out2);
        conf2.set(TextOutputFormat.SEPERATOR, DELIMITER);
        job2.setInputFormatClass(TextInputFormat.class);
        job2.setOutputFormatClass(TextOutputFormat.class);

        job2.setSortComparatorClass(Idfsort.class);
        job2.setGroupingComparatorClass(Idfsort.class);

        job2.setMapperClass(Idfmapper.class);
        job2.setMapOutputKeyClass(Docword.class);
        job2.setMapOutputValueClass(IntWritable.class);
        job2.setCombinerClass(Idfcombiner.class);
        job2.setReducerClass(Idfreducer.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(DoubleWritable.class);
        job2.setNumReduceTasks(3);
        job2.setPartitionerClass(Idfpartition.class);

        if(job2.waitForCompletion(true)==false)
            return 1;

        //============Job3配置============
        Job job3=Job.getInstance(getConf(), "tfidf-tfidf");
        Configuration conf3=job3.getConfiguration();
        job3.setJarByClass(getClass());

        out3.getFileSystem(conf3).delete(out3, true);
        FileOutputFormat.setOutputPath(job3, out3);
        conf3.set(TextOutputFormat.SEPERATOR, DELIMITER);
        job3.setOutputFormatClass(TextOutputFormat.class);

        //利用MultipleInputs,配置Map分别读取Job1、Job2的输出文件
        MultipleInputs.addInputPath(job3, out1, TextInputFormat.class, Tfidf_tfmapper.class);
        MultipleInputs.addInputPath(job3, out2, TextInputFormat.class, Tfidf_idfmapper.class);

        job3.setMapOutputKeyClass(Docword.class);
        job3.setMapOutputValueClass(Docword.class);

        job3.setReducerClass(Tfidfreducer.class);
        job3.setOutputKeyClass(Text.class);
        job3.setOutputValueClass(DoubleWritable.class);
        job3.setNumReduceTasks(3);
        job3.setSortComparatorClass(Tfidfsort.class);
        job3.setGroupingComparatorClass(Tfidfgroup.class);
        job3.setPartitionerClass(Tfidfpartition.class);
        return job3.waitForCompletion(true)?0:1;

    }
    public static void main(String []args) throws Exception{
        int result=0;
        try{
            result=ToolRunner.run(new Configuration(), new Tfidf(), args);
        }catch(Exception e){
            e.printStackTrace();
        }
        System.exit(result);
    }

}</span>


本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

TFIDF算法Hadoop实现 的相关文章

随机推荐