0
点赞
收藏
分享

微信扫一扫

hadoop-streaming多输入join与shuffle原理解读


多个输入的join在reducer的写法

def read_mapper_output(file,separator='\t'):
for line in file:
yield line.rstrip().split(separator,1)

def main(separator='\t'):
data=read_mapper_output(sys.stdin,separator=separator)
for feature, group in groupby(data, itemgetter(0)):
encode = ''
for info in group:
try:
items = info[1].split('\t')

多个输入的join在mapper的写法

import os

if "map_input_file" in os.environ:
map_input_file = os.environ["map_input_file"]
else:
map_input_file = 'youtube/deepfm'

with open(0, 'r', errors='ignore') as f_in:
if 'youtube/fm/v2' in map_input_file:
print('\t'.join([feature, 'encode', encode]))
if 'youtube/deepfm' in map_input_file:
print('\t'.join([feature, 'sample', encode]))

本地验证itertools的groupby

from itertools import groupby
from operator import itemgetter
x = [['2', '\t'.join(['sample', 'train'])], ['1', '\t'.join(['encode', 'pred'])], ['1', '\t'.join(['sample', 'pred'])]]
soooo = sorted(x, key=itemgetter(0))
for feature, group in groupby(soooo, key=itemgetter(0)):
for info in group:
print(info)
print(f'feature: {feature} info[1]: {info[1]}')

实验结果

['1', 'encode\tpred']
feature: 1 info[1]: encode pred
['1', 'sample\tpred']
feature: 1 info[1]: sample pred
['2', 'sample\ttrain']
feature: 2 info[1]: sample train

可以看出同一个key的记录被聚合到了一起,且encode排在了sample的前面,这是因为reducer端shuffle对sys.stdin中的字符串进行了排序

hadoop-streaming的shuffle过程

Shuffle阶段分为两部分:Map端和Reduce端。

hadoop-streaming多输入join与shuffle原理解读_hadoop

一 map端shuffle过程;

1-内存预排序:默认每个map有100M内存进行预排序(为了效率),超过阈值,会把内容写到磁盘;此过程使用快速排序算法

2-根据key和reducer的数量进行分区和排序;首先根据数据所属的Partition排序,然后每个Partition中再按Key排序;此过程排序默认使用归并排序算法

3-combiner,使得map的输出结果更紧凑,减少磁盘写入和传输的数据量。慎用,可能会对结果产生错误的结果;如果存在combiner阶段;

4-一个Map任务会产生多个spill文件,在Map任务完成前,所有的spill文件将会归并排序为一个索引文件和数据文件。当spill文件归并完成后,Map将删除所有的临时文件,并告知TaskTracker任务已完成。

二 reduce的shuffle阶段

1-copy阶段:Reduce端通过HTTP获取Map端的数据,只要有一个map任务完成,Reduce任务就开始复制它的输出。JobTracker知道Map输出与TaskTracker的映射关系,Reduce端有一个线程间歇地向JobTracker询问Map输出的地址,直到把所有的数据都获取到。

2-排序阶段,又称合并阶段。将多个已经排序的文件合并成一个文件。Merge有三种形式:内存到内存,内存到磁盘,磁盘到磁盘。

此过程顺序比较插入排序算法,可能都不叫算法。只是对多个已排序文件合并成一个文件。


举报

相关推荐

0 条评论