一系列添加选项的操作:包括minSupport,analyzerName,chunkSize,weight,minDF等等。
Option chunkSizeOpt = obuilder. withLongName ( "chunkSize" ). withArgument (
abuilder. withName ( "chunkSize" ). withMinimum ( 1 ). withMaximum ( 1 ). create ( ) ). withDescription (
"The chunkSize in MegaBytes. 100-10000 MB" ). withShortName ( "chunk" ). create ( ) ;
//term weight,TF或TFIDF
Option weightOpt = obuilder. withLongName ( "weight" ). withRequired ( false ). withArgument (
abuilder. withName ( "weight" ). withMinimum ( 1 ). withMaximum ( 1 ). create ( ) ). withDescription (
"The kind of weight to use. Currently TF or TFIDF" ). withShortName ( "wt" ). create ( ) ;
//最小文档频率minDF
Option minDFOpt = obuilder. withLongName ( "minDF" ). withRequired ( false ). withArgument (
abuilder. withName ( "minDF" ). withMinimum ( 1 ). withMaximum ( 1 ). create ( ) ). withDescription (
"The minimum document frequency. Default is 1" ). withShortName ( "md" ). create ( ) ;
……
一系列获取用户输入的选项的操作:
= new Path ( ( String ) cmdLine. getValue (inputDirOpt ) ) ;
Path outputDir = new Path ( ( String ) cmdLine. getValue (outputDirOpt ) ) ;
int chunkSize = 100 ;
if (cmdLine. hasOption (chunkSizeOpt ) ) {
chunkSize = Integer. parseInt ( ( String ) cmdLine. getValue (chunkSizeOpt ) ) ;
}
int minSupport = 2 ;
if (cmdLine. hasOption (minSupportOpt ) ) {
String minSupportString = ( String ) cmdLine. getValue (minSupportOpt ) ;
minSupport = Integer. parseInt (minSupportString ) ;
}
……
在SparseVectorsFromSequenceFiles的输入目录为经过SequenceFilesFromDirectory加工过的SequenceFile。SequenceFile是hadoop专有的文件格式,保存的是key/value对。SparseVectorsFromSequenceFiles中首先是将输入目录的SequenceFile通过DocumentProcessor的处理,保存在输出目录的tokenized-documents目录中。
而DocumentProcessor也就是只有一个map,没有reduce的一个job。将原来的key按原样输出,value提取后tokenize一下,转化成List,也就是将value中的文本去掉标点符号,以空格分开后的单词。
SparseVectorsFromSequenceFiles有如下两行:
= getConf ( ) ;
Path tokenizedPath =
new Path (outputDir, DocumentProcessor. TOKENIZED_DOCUMENT_OUTPUT_FOLDER ) ;
//TODO: move this into DictionaryVectorizer , and then fold SparseVectorsFrom with EncodedVectorsFrom to have one framework for all of this.
DocumentProcessor. tokenizeDocuments (inputDir, analyzerClass, tokenizedPath, conf ) ;
再看看处理文本的job,DocumentProcessor.tokenizeDocuments(),只有一个mapper SequenceFileTokenizerMapper。
public static void tokenizeDocuments (Path input,
Class < ? extends Analyzer > analyzerClass,
Path output,
Configuration baseConf )
throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration (baseConf ) ;
// this conf parameter needs to be set enable serialisation of conf values
conf. set ( "io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+ "org.apache.hadoop.io.serializer.WritableSerialization" ) ;
conf. set (ANALYZER_CLASS, analyzerClass. getName ( ) ) ;
Job job = new Job (conf ) ;
job. setJobName ( "DocumentProcessor::DocumentTokenizer: input-folder: " + input ) ;
job. setJarByClass (DocumentProcessor. class ) ;
job. setOutputKeyClass (Text. class ) ;
job. setOutputValueClass (StringTuple. class ) ;
FileInputFormat. setInputPaths (job, input ) ;
FileOutputFormat. setOutputPath (job, output ) ;
job. setMapperClass (SequenceFileTokenizerMapper. class ) ;
job. setInputFormatClass (SequenceFileInputFormat. class ) ;
job. setNumReduceTasks ( 0 ) ;
job. setOutputFormatClass (SequenceFileOutputFormat. class ) ;
HadoopUtil. delete (conf, output ) ;
boolean succeeded = job. waitForCompletion ( true ) ;
if ( !succeeded )
throw new IllegalStateException ( "Job failed!" ) ;
tokenizer之后,便进行TFIDF计算。
进行TFIDF计算
如果用户输入的maxDFSigma大于0,则输出目录为tf-vectors-toprune,否则为tf-vectors。
由DictionaryVectorizer类的createTermFrequencyVectors()静态方法来完成。
进行TFIDF计算的第一步是WordCount,
if n-gram为1,则直接由startWordCounting()方法来完成
outputKey=Text
outputValue=LongWritable
Mapper= TermCountMapper(org.apache.mahout.vectorizer.term)
Combiner=TermCountCombiner
Reducer=TermCountReducer
输出类型=SequenceFileOutputFormat
输出目录=wordcount
说白了就是hadoop入门的第一个程序:wordCount
else 由CollocDriver.generateAllGrams()来完成(两个job):
generateCollocations
computeNGramsPruneByLLR
第二步,给每个单词编号(assign ids to feature List)。
由createDictionaryChunks处理,输入目录为wordcount,输出文件为dictionary.file-*,每个chunk一个块号。
int i = 0 ;
for (Pair < Writable,Writable > record
: new SequenceFileDirIterable < Writable,Writable > (filesPattern, PathType. GLOB, null, null, true, conf ) ) {
if (currentChunkSize > chunkSizeLimit ) {
Closeables. closeQuietly (dictWriter ) ;
chunkIndex ++;
chunkPath = new Path (dictionaryPathBase, DICTIONARY_FILE + chunkIndex ) ;
chunkPaths. add (chunkPath ) ;
dictWriter = new SequenceFile. Writer (fs, conf, chunkPath, Text. class, IntWritable. class ) ;
currentChunkSize = 0 ;
}
Writable key = record. getFirst ( ) ;
int fieldSize = DICTIONARY_BYTE_OVERHEAD + key. toString ( ). length ( ) * 2 + Integer. SIZE / 8 ;
currentChunkSize += fieldSize ;
dictWriter. append (key, new IntWritable (i ++ ) ) ; //编号!
}
maxTermDimension [ 0 ] = i ;
从0开始编号,最后的词的数量i保存在maxTermDimension[0]中。
第三步,构造PartialVector
最开始的tokenizer之后,文章以key/value的sequenceFile保存,其中key为相对路径,value为整篇文章的单词组。
上一步得到的dictionary是每个单词对应一个id,也写入sequenceFile里面。
mapper将tokenizer后的文章原样输出,reducer一部分如下:
protected void reduce (Text key, Iterable < StringTuple > values, Context context )
throws IOException, InterruptedException {
Iterator < StringTuple > it = values. iterator ( ) ;
if ( !it. hasNext ( ) ) {
return ;
}
StringTuple value = it. next ( ) ;
Vector vector = new RandomAccessSparseVector (dimension, value. length ( ) ) ; // guess at initial size
for ( String term : value. getEntries ( ) ) {
if ( !term. isEmpty ( ) && dictionary. containsKey (term ) ) { // unigram
int termId = dictionary. get (term ) ;
vector. setQuick (termId, vector. getQuick (termId ) + 1 ) ;
}
}
if (vector. getNumNondefaultElements ( ) > 0 ) {
VectorWritable vectorWritable = new VectorWritable (vector ) ;
context. write (key, vectorWritable ) ;
}
}
此时以tokenizer之后的文章和dictionary作为输入,每篇文章得到一个vector(类型为RandomAccessSparseVector,其实是一个hashMap),vector保存的是每篇文章的id号和频率。
然后以key/vector写入。
由于上一步产生的dictionary可能很大,分过块,每次reduce只从一个dictionary的chunk中提取id,分多次处理,最后再合并。合并采用PartialVectorMerger.mergePartialVectors()方法设置一个job来完成。
默认是不计算IDF的,在参数中指明后会在上一步计算partialVector(TF)后计算IDF,输入为TF目录。
if (shouldPrune || processIdf ) {
docFrequenciesFeatures = TFIDFConverter. calculateDF ( new Path (outputDir, tfDirName ),
outputDir, conf, chunkSize ) ;
}
计算IDF过程比较清晰:
看此过程的Mapper:
protected void map (WritableComparable < ? > key, VectorWritable value, Context context )
throws IOException, InterruptedException {
Vector vector = value. get ( ) ;
Iterator < Vector. Element > it = vector. iterateNonZero ( ) ;
while (it. hasNext ( ) ) {
Vector. Element e = it. next ( ) ;
context. write ( new IntWritable (e. index ( ) ), ONE ) ;
}
context. write (TOTAL_COUNT, ONE ) ;
}
输入为key/vector,提取出vector内容,对每一个词,得到他在词典中的id,然后加1.现在key变为这个词的id。
Reducer:
protected void reduce (IntWritable key, Iterable < LongWritable > values, Context context )
throws IOException, InterruptedException {
long sum = 0 ;
for (LongWritable value : values ) {
sum += value. get ( ) ;
}
context. write (key, new LongWritable (sum ) ) ;
}
相同key的value相加,又是一个wordcount程序。这样每个词key在多少个文档中出现过DF(不是在文档中出现的次数)就得到了。
输出目录为df-count,同计算tf一样,分为几个chunk写入HDFS。
根据要求有一个计算标准差的过程:
double stdDev = BasicStats. stdDevForGivenMean (dfDir, stdCalcDir, 0.0, conf ) ;
long vectorCount = docFrequenciesFeatures. getFirst ( ) [ 1 ] ;
maxDF = ( int ) ( 100.0 * maxDFSigma * stdDev / vectorCount ) ;
以及一个pruneVector的过程:
pruneVectors (tfDir,
prunedTFDir,
prunedPartialTFDir,
maxDF,
conf,
docFrequenciesFeatures,
-1.0f,
false,
reduceTasks ) ;
最后计算TFIDF:
public static void processTfIdf (Path input,
Path output,
Configuration baseConf,
Pair < Long [ ], List < Path > > datasetFeatures,
int minDf,
long maxDF,
float normPower,
boolean logNormalize,
boolean sequentialAccessOutput,
boolean namedVector,
int numReducers )
Mapper照原样输出,Reducer一部分如下:
protected void reduce (WritableComparable < ? > key, Iterable < VectorWritable > values, Context context )
throws IOException, InterruptedException {
Iterator < VectorWritable > it = values. iterator ( ) ;
if ( !it. hasNext ( ) ) {
return ;
}
Vector value = it. next ( ). get ( ) ;
Iterator < Vector. Element > it1 = value. iterateNonZero ( ) ;
Vector vector = new RandomAccessSparseVector ( ( int ) featureCount, value. getNumNondefaultElements ( ) ) ;
while (it1. hasNext ( ) ) {
Vector. Element e = it1. next ( ) ;
if ( !dictionary. containsKey (e. index ( ) ) ) {
continue ;
}
long df = dictionary. get (e. index ( ) ) ;
if (maxDf > - 1 && ( 100.0 * df ) / vectorCount > maxDf ) {
continue ;
}
if (df < minDf ) {
df = minDf ;
}
vector. setQuick (e. index ( ), tfidf. calculate ( ( int ) e. get ( ), ( int ) df, ( int ) featureCount, ( int ) vectorCount ) ) ;
}
if (sequentialAccess ) {
vector = new SequentialAccessSparseVector (vector ) ;
}
if (namedVector ) {
vector = new NamedVector (vector, key. toString ( ) ) ;
}
VectorWritable vectorWritable = new VectorWritable (vector ) ;
context. write (key, vectorWritable ) ;
}
注意到
vector.setQuick(e.index(), tfidf.calculate((int) e.get(), (int) df, (int) featureCount, (int) vectorCount));
首先得到单词的id,然后计算tf*idf,再写回到这个vector。
最后的context.write(key, vectorWritable)得到了key为此文本的相对路径,value为ft*idf的词的vector。
至此,计算完成。
整个过程产生的目录如下:
整个过程的所有job信息如下:
http://hnote.org/big-data/mahout/sparsevectorsfromsequencefiles-2
http://soledede.com/
微信公众号: