0
点赞
收藏
分享

微信扫一扫

Mahout之SparseVectorsFromSequenceFiles源码分析


一系列添加选项的操作:包括minSupport,analyzerName,chunkSize,weight,minDF等等。


Option chunkSizeOpt        = obuilder.       withLongName       (       "chunkSize"       ).       withArgument       (       
abuilder.       withName       (       "chunkSize"       ).       withMinimum       (       1       ).       withMaximum       (       1       ).       create       (       )       ).       withDescription       (       
             "The chunkSize in MegaBytes. 100-10000 MB"       ).       withShortName       (       "chunk"       ).       create       (       )       ;       
              //term weight,TF或TFIDF       
           Option weightOpt        = obuilder.       withLongName       (       "weight"       ).       withRequired       (       false       ).       withArgument       (       
     abuilder.       withName       (       "weight"       ).       withMinimum       (       1       ).       withMaximum       (       1       ).       create       (       )       ).       withDescription       (       
             "The kind of weight to use. Currently TF or TFIDF"       ).       withShortName       (       "wt"       ).       create       (       )       ;       
           //最小文档频率minDF       
           Option minDFOpt        = obuilder.       withLongName       (       "minDF"       ).       withRequired       (       false       ).       withArgument       (       
     abuilder.       withName       (       "minDF"       ).       withMinimum       (       1       ).       withMaximum       (       1       ).       create       (       )       ).       withDescription       (       
             "The minimum document frequency.  Default is 1"       ).       withShortName       (       "md"       ).       create       (       )       ;       
……



一系列获取用户输入的选项的操作:

=        new Path       (       (       String       ) cmdLine.       getValue       (inputDirOpt       )       )       ;       
      Path outputDir        =        new Path       (       (       String       ) cmdLine.       getValue       (outputDirOpt       )       )       ;       
          
             int chunkSize        =        100       ;       
             if        (cmdLine.       hasOption       (chunkSizeOpt       )       )        {       
        chunkSize        =        Integer.       parseInt       (       (       String       ) cmdLine.       getValue       (chunkSizeOpt       )       )       ;       
             }       
             int minSupport        =        2       ;       
             if        (cmdLine.       hasOption       (minSupportOpt       )       )        {       
               String minSupportString        =        (       String       ) cmdLine.       getValue       (minSupportOpt       )       ;       
        minSupport        =        Integer.       parseInt       (minSupportString       )       ;       
             }       
   ……




在SparseVectorsFromSequenceFiles的输入目录为经过SequenceFilesFromDirectory加工过的SequenceFile。SequenceFile是hadoop专有的文件格式,保存的是key/value对。SparseVectorsFromSequenceFiles中首先是将输入目录的SequenceFile通过DocumentProcessor的处理,保存在输出目录的tokenized-documents目录中。
而DocumentProcessor也就是只有一个map,没有reduce的一个job。将原来的key按原样输出,value提取后tokenize一下,转化成List,也就是将value中的文本去掉标点符号,以空格分开后的单词。

SparseVectorsFromSequenceFiles有如下两行:

= getConf       (       )       ;       
      Path tokenizedPath        =       
new Path       (outputDir, DocumentProcessor.       TOKENIZED_DOCUMENT_OUTPUT_FOLDER       )       ;       
             //TODO: move this into DictionaryVectorizer , and then fold SparseVectorsFrom with EncodedVectorsFrom to have one framework for all of this.       
      DocumentProcessor.       tokenizeDocuments       (inputDir, analyzerClass, tokenizedPath, conf       )       ;




再看看处理文本的job,DocumentProcessor.tokenizeDocuments(),只有一个mapper SequenceFileTokenizerMapper。

public        static        void tokenizeDocuments       (Path input,       
                                       Class       <        ?        extends Analyzer        > analyzerClass,       
                                       Path output,       
                                       Configuration baseConf       )       
           throws        IOException,        InterruptedException,        ClassNotFoundException        {       
    Configuration conf        =        new Configuration       (baseConf       )       ;       
           // this conf parameter needs to be set enable serialisation of conf values       
    conf.       set       (       "io.serializations",        "org.apache.hadoop.io.serializer.JavaSerialization,"       
                                         +        "org.apache.hadoop.io.serializer.WritableSerialization"       )       ;       
    conf.       set       (ANALYZER_CLASS, analyzerClass.       getName       (       )       )       ;       
    Job job        =        new Job       (conf       )       ;       
    job.       setJobName       (       "DocumentProcessor::DocumentTokenizer: input-folder: "        + input       )       ;       
    job.       setJarByClass       (DocumentProcessor.       class       )       ;       
            
    job.       setOutputKeyClass       (Text.       class       )       ;       
    job.       setOutputValueClass       (StringTuple.       class       )       ;       
    FileInputFormat.       setInputPaths       (job, input       )       ;       
    FileOutputFormat.       setOutputPath       (job, output       )       ;       
          
    job.       setMapperClass       (SequenceFileTokenizerMapper.       class       )       ;       
    job.       setInputFormatClass       (SequenceFileInputFormat.       class       )       ;       
    job.       setNumReduceTasks       (       0       )       ;       
    job.       setOutputFormatClass       (SequenceFileOutputFormat.       class       )       ;       
    HadoopUtil.       delete       (conf, output       )       ;       
           
           boolean succeeded        = job.       waitForCompletion       (       true       )       ;       
           if        (       !succeeded       )       
             throw        new        IllegalStateException       (       "Job failed!"       )       ;



tokenizer之后,便进行TFIDF计算。

进行TFIDF计算
如果用户输入的maxDFSigma大于0,则输出目录为tf-vectors-toprune,否则为tf-vectors。
由DictionaryVectorizer类的createTermFrequencyVectors()静态方法来完成。
进行TFIDF计算的第一步是WordCount,
if n-gram为1,则直接由startWordCounting()方法来完成


outputKey=Text
         outputValue=LongWritable
         Mapper= TermCountMapper(org.apache.mahout.vectorizer.term)
	Combiner=TermCountCombiner
	Reducer=TermCountReducer
	输出类型=SequenceFileOutputFormat
	输出目录=wordcount


说白了就是hadoop入门的第一个程序:wordCount
else 由CollocDriver.generateAllGrams()来完成(两个job):


generateCollocations
	computeNGramsPruneByLLR


第二步,给每个单词编号(assign ids to feature List)。

由createDictionaryChunks处理,输入目录为wordcount,输出文件为dictionary.file-*,每个chunk一个块号。


int i        =        0       ;       
             for        (Pair       < Writable,Writable        > record       
                  :        new SequenceFileDirIterable       < Writable,Writable        >       (filesPattern, PathType.       GLOB,        null,        null,        true, conf       )       )        {       
               if        (currentChunkSize        > chunkSizeLimit       )        {       
          Closeables.       closeQuietly       (dictWriter       )       ;       
          chunkIndex       ++;       
          chunkPath        =        new Path       (dictionaryPathBase, DICTIONARY_FILE        + chunkIndex       )       ;       
          chunkPaths.       add       (chunkPath       )       ;       
          dictWriter        =        new SequenceFile.       Writer       (fs, conf, chunkPath, Text.       class, IntWritable.       class       )       ;       
          currentChunkSize        =        0       ;       
               }       
        Writable key        = record.       getFirst       (       )       ;       
               int fieldSize        = DICTIONARY_BYTE_OVERHEAD        + key.       toString       (       ).       length       (       )        *        2        +        Integer.       SIZE        /        8       ;       
        currentChunkSize        += fieldSize       ;       
        dictWriter.       append       (key,        new IntWritable       (i       ++       )       )       ;       //编号!       
             }       
      maxTermDimension       [       0       ]        = i       ;



从0开始编号,最后的词的数量i保存在maxTermDimension[0]中。

第三步,构造PartialVector
最开始的tokenizer之后,文章以key/value的sequenceFile保存,其中key为相对路径,value为整篇文章的单词组。
上一步得到的dictionary是每个单词对应一个id,也写入sequenceFile里面。
mapper将tokenizer后的文章原样输出,reducer一部分如下:


protected        void reduce       (Text key, Iterable       < StringTuple        > values,        Context context       )       
                 throws        IOException,        InterruptedException        {       
    Iterator       < StringTuple        > it        = values.       iterator       (       )       ;       
           if        (       !it.       hasNext       (       )       )        {       
             return       ;       
           }       
    StringTuple value        = it.       next       (       )       ;       
           Vector vector        =        new RandomAccessSparseVector       (dimension, value.       length       (       )       )       ;        // guess at initial size       
             for        (       String term        : value.       getEntries       (       )       )        {       
               if        (       !term.       isEmpty       (       )        && dictionary.       containsKey       (term       )       )        {        // unigram       
                 int termId        = dictionary.       get       (term       )       ;       
          vector.       setQuick       (termId, vector.       getQuick       (termId       )        +        1       )       ;       
               }       
             }       
           if        (vector.       getNumNondefaultElements       (       )        >        0       )        {       
      VectorWritable vectorWritable        =        new VectorWritable       (vector       )       ;       
      context.       write       (key, vectorWritable       )       ;       
           }       
         }



此时以tokenizer之后的文章和dictionary作为输入,每篇文章得到一个vector(类型为RandomAccessSparseVector,其实是一个hashMap),vector保存的是每篇文章的id号和频率。
然后以key/vector写入。
由于上一步产生的dictionary可能很大,分过块,每次reduce只从一个dictionary的chunk中提取id,分多次处理,最后再合并。合并采用PartialVectorMerger.mergePartialVectors()方法设置一个job来完成。
默认是不计算IDF的,在参数中指明后会在上一步计算partialVector(TF)后计算IDF,输入为TF目录。

if        (shouldPrune        || processIdf       )        {       
      docFrequenciesFeatures        = TFIDFConverter.       calculateDF       (       new Path       (outputDir, tfDirName       ),       
          outputDir, conf, chunkSize       )       ;       
           }



计算IDF过程比较清晰:
看此过程的Mapper:

protected        void map       (WritableComparable       <        ?        > key, VectorWritable value,        Context context       )       
           throws        IOException,        InterruptedException        {       
           Vector vector        = value.       get       (       )       ;       
    Iterator       <        Vector.       Element        > it        = vector.       iterateNonZero       (       )       ;       
           while        (it.       hasNext       (       )       )        {       
             Vector.       Element e        = it.       next       (       )       ;       
      context.       write       (       new IntWritable       (e.       index       (       )       ), ONE       )       ;       
           }       
    context.       write       (TOTAL_COUNT, ONE       )       ;       
         }




输入为key/vector,提取出vector内容,对每一个词,得到他在词典中的id,然后加1.现在key变为这个词的id。
Reducer:

protected        void reduce       (IntWritable key, Iterable       < LongWritable        > values,        Context context       )       
           throws        IOException,        InterruptedException        {       
           long sum        =        0       ;       
           for        (LongWritable value        : values       )        {       
      sum        += value.       get       (       )       ;       
           }       
    context.       write       (key,        new LongWritable       (sum       )       )       ;       
         }



相同key的value相加,又是一个wordcount程序。这样每个词key在多少个文档中出现过DF(不是在文档中出现的次数)就得到了。
输出目录为df-count,同计算tf一样,分为几个chunk写入HDFS。
根据要求有一个计算标准差的过程:


double stdDev        = BasicStats.       stdDevForGivenMean       (dfDir, stdCalcDir,        0.0, conf       )       ;       
          long vectorCount        = docFrequenciesFeatures.       getFirst       (       )       [       1       ]       ;       
   maxDF        =        (       int       )        (       100.0        * maxDFSigma        * stdDev        / vectorCount       )       ;



以及一个pruneVector的过程:

pruneVectors       (tfDir,       
                                          prunedTFDir,       
                                          prunedPartialTFDir,       
                                          maxDF,       
                                          conf,       
                                          docFrequenciesFeatures,       
                                                 -1.0f,       
                                                 false,       
                                          reduceTasks       )       ;



最后计算TFIDF:

public        static        void processTfIdf       (Path input,       
                                  Path output,       
                                  Configuration baseConf,       
                                  Pair       <        Long       [       ], List       < Path        >        > datasetFeatures,       
                                         int minDf,       
                                         long maxDF,       
                                         float normPower,       
                                         boolean logNormalize,       
                                         boolean sequentialAccessOutput,       
                                         boolean namedVector,       
                                         int numReducers       )




Mapper照原样输出,Reducer一部分如下:

protected        void reduce       (WritableComparable       <        ?        > key, Iterable       < VectorWritable        > values,        Context context       )       
           throws        IOException,        InterruptedException        {       
    Iterator       < VectorWritable        > it        = values.       iterator       (       )       ;       
           if        (       !it.       hasNext       (       )       )        {       
             return       ;       
           }       
           Vector value        = it.       next       (       ).       get       (       )       ;       
    Iterator       <        Vector.       Element        > it1        = value.       iterateNonZero       (       )       ;       
           Vector vector        =        new RandomAccessSparseVector       (       (       int       ) featureCount, value.       getNumNondefaultElements       (       )       )       ;       
           while        (it1.       hasNext       (       )       )        {       
             Vector.       Element e        = it1.       next       (       )       ;       
             if        (       !dictionary.       containsKey       (e.       index       (       )       )       )        {       
               continue       ;       
             }       
             long df        = dictionary.       get       (e.       index       (       )       )       ;       
             if        (maxDf        >        -       1        &&        (       100.0        * df       )        / vectorCount        > maxDf       )        {       
               continue       ;       
             }       
             if        (df        < minDf       )        {       
        df        = minDf       ;       
             }       
      vector.       setQuick       (e.       index       (       ), tfidf.       calculate       (       (       int       ) e.       get       (       ),        (       int       ) df,        (       int       ) featureCount,        (       int       ) vectorCount       )       )       ;       
           }       

           if        (sequentialAccess       )        {       
      vector        =        new SequentialAccessSparseVector       (vector       )       ;       
           }       
           if        (namedVector       )        {       
      vector        =        new NamedVector       (vector, key.       toString       (       )       )       ;       
           }       
    VectorWritable vectorWritable        =        new VectorWritable       (vector       )       ;       
    context.       write       (key, vectorWritable       )       ;       
         }




注意到



vector.setQuick(e.index(), tfidf.calculate((int) e.get(), (int) df, (int) featureCount, (int) vectorCount));


首先得到单词的id,然后计算tf*idf,再写回到这个vector。

最后的context.write(key, vectorWritable)得到了key为此文本的相对路径,value为ft*idf的词的vector。

至此,计算完成。

整个过程产生的目录如下:

整个过程的所有job信息如下:

http://hnote.org/big-data/mahout/sparsevectorsfromsequencefiles-2


http://soledede.com/

 






微信公众号:



Mahout之SparseVectorsFromSequenceFiles源码分析_hadoop



 

举报

相关推荐

0 条评论