0
点赞
收藏
分享

微信扫一扫

Nutch抓取源码分析之Injector类


(1) 将URL集合进行格式化和过滤,消除其中的非法URL,并设定URL状态(UNFETCHED),按照一定方法进行初始化分值;

(2) 将URL进行合并,消除重复的URL入口;

(3) 将URL及其状态、分值存入crawldb数据库,与原数据库中重复的则删除旧的,更换新的。


inject方法

public void inject(Path crawlDb, Path urlDir) throws IOException {
//创建临时目录
Path tempDir = new Path(getConf().get("mapred.temp.dir", ".") +
"/inject-temp-"+
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
// map text input file to a <url,CrawlDatum> file
JobConf sortJob = new NutchJob(getConf());
sortJob.setJobName("inject " + urlDir);
FileInputFormat.addInputPath(sortJob, urlDir);
sortJob.setMapperClass(InjectMapper.class);
FileOutputFormat.setOutputPath(sortJob, tempDir);
sortJob.setOutputFormat(SequenceFileOutputFormat.class);
sortJob.setOutputKeyClass(Text.class);
sortJob.setOutputValueClass(CrawlDatum.class);
sortJob.setLong("injector.current.time", System.currentTimeMillis());
JobClient.runJob(sortJob);

// merge with existing crawl db
JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb);
FileInputFormat.addInputPath(mergeJob, tempDir);
mergeJob.setReducerClass(InjectReducer.class);
JobClient.runJob(mergeJob);
CrawlDb.install(mergeJob, crawlDb);
// 删除临时文件
FileSystem fs = FileSystem.get(getConf());
fs.delete(tempDir, true);
long end = System.currentTimeMillis();
}

InjectMapper类

public static class InjectMapper implements Mapper<WritableComparable, Text, Text, CrawlDatum> {
private URLNormalizers urlNormalizers; //URL 标准化工具
private int interval; //设置抓取间隔时间
private float scoreInjected; //url 对页面的得分值
private JobConf jobConf;
private URLFilters filters; //url 过滤器
private ScoringFilters scfilters; //得分器
private long curTime;

public void configure(JobConf job) {
this.jobConf = job;
urlNormalizers = new URLNormalizers(job, URLNormalizers.SCOPE_INJECT);
interval = jobConf.getInt("db.fetch.interval.default", 2592000);
filters = new URLFilters(jobConf);
scfilters = new ScoringFilters(jobConf);
scoreInjected = jobConf.getFloat("db.score.injected", 1.0f);
curTime = job.getLong("injector.current.time", System.currentTimeMillis());
}
public void close() {}
public void map(WritableComparable key, Text value,
OutputCollector<Text, CrawlDatum> output, Reporter reporter)
throws IOException {
String url = value.toString(); // value is line of text

if (url != null && url.trim().startsWith("#")) {
return;
}

// if tabs : metadata that could be stored
// must be name=value and separated by \t
float customScore = -1f;
int customInterval = interval;
Map<String,String> metadata = new TreeMap<String,String>();
if (url.indexOf("\t")!=-1){
String[] splits = url.split("\t");
url = splits[0];
for (int s=1;s<splits.length;s++){
// find separation between name and value
int indexEquals = splits[s].indexOf("=");
if (indexEquals==-1) {
// skip anything without a =
continue;
}
String metaname = splits[s].substring(0, indexEquals);
String metavalue = splits[s].substring(indexEquals+1);
if (metaname.equals(nutchScoreMDName)) {
try {
customScore = Float.parseFloat(metavalue);}
catch (NumberFormatException nfe){}
}
else if (metaname.equals(nutchFetchIntervalMDName)) {
try {
customInterval = Integer.parseInt(metavalue);}
catch (NumberFormatException nfe){}
}
else metadata.put(metaname,metavalue);
}
}
try {
//url 标准化
url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);
//过滤非法url
url = filters.filter(url);
} catch (Exception e) {
if (LOG.isWarnEnabled()) { LOG.warn("Skipping " +url+":"+e); }
url = null;
}
if (url != null) { // if it passes
value.set(url); // collect it
// CrawlDatum保存注入状态、抓取间隔时间,抓取时间、得分等等
CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_INJECTED, customInterval);
datum.setFetchTime(curTime);
// now add the metadata
Iterator<String> keysIter = metadata.keySet().iterator();
while (keysIter.hasNext()){
String keymd = keysIter.next();
String valuemd = metadata.get(keymd);
datum.getMetaData().put(new Text(keymd), new Text(valuemd));
}
if (customScore != -1) datum.setScore(customScore);
else datum.setScore(scoreInjected);
try {
scfilters.injectedScore(value, datum);
} catch (ScoringFilterException e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Cannot filter injected score for url " + url
+ ", using default (" + e.getMessage() + ")");
}
}
output.collect(value, datum);
}
}
}


InjectReducer 类

public static class InjectReducer implements Reducer<Text, CrawlDatum, Text, CrawlDatum> {
public void configure(JobConf job) {}
public void close() {}
private CrawlDatum old = new CrawlDatum();
private CrawlDatum injected = new CrawlDatum();

public void reduce(Text key, Iterator<CrawlDatum> values,
OutputCollector<Text, CrawlDatum> output, Reporter reporter)
throws IOException {
boolean oldSet = false;
while (values.hasNext()) {
CrawlDatum val = values.next();
// 如果某个URL已经注入到CrawlDB
if (val.getStatus() == CrawlDatum.STATUS_INJECTED) {
injected.set(val);
//设置状态为不需要抓取
injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
} else {
old.set(val);
oldSet = true;
}
}
CrawlDatum res = null;
if (oldSet) res = old; // don't overwrite existing value
else res = injected;
output.collect(key, res);
}
}




举报

相关推荐

0 条评论