在了解到Hadoop的生态环境以及Hadoop单机模式和伪分布式模式安装配置之后,我们可以使用自己熟悉的语言来编写Hadoop MapReduce程序,进一步了解MapReduce编程模型。
本教程将使用Python语言为Hadoop编写一个简单的MapReduce程序:单词计数
尽管Hadoop框架是用Java编写的,但是为Hadoop编写的程序不必非要Java写,还可以使用其他语言开发,比如Python,Ruby,C++等
编写完成的MapReduce程序可以直接在你已经搭建好的伪分布式程序中调试运行。
MapReduce的Python代码
我们将使用Hadoop流API通过STDIN和STDOUT在Map和Reduce代码间传递数据。我们只需要使用Python的sys.stdin读取输入数据和打印输出到sys.stdout。这就是我们需要做的,因为Hadoop流会处理好其他的一切。
mapper.py
将下面的代码保存在文件 /home/hadoop/workspace/mapper.py
中。它将从STDIN
读取数据,拆分为单词并输出一组映射单词和它们数量(中间值)的行到STDOUT
。尽管这个Map脚本不会计算出单词出现次数的总和(中间值)。相反,它会立即输出 <word> 1
元组的形式——即使某个特定的单词可能会在输入中出现多次。在我们的例子中,我们让后续的Reduce做最终的总和计数。当然,你可以按照你的想法在你自己的脚本中修改这段代码。
需要给mapper.py文件赋予可执行权限:
chmod +x /home/hadoop/workspace/mapper.py
/home/hadoop/workspace/mapper.py
代码如下
#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys
# 从标准输入STDIN输入
for line in sys.stdin:
# 移除line收尾的空白字符
line = line.strip()
# 将line分割为单词
words = line.split()
# 遍历
for word in words:
# 将结果写到标准输出STDOUT
# 此处的输出会作为Reduce代码的输入
print('{}\t{}'.format(word, 1))
reducer.py
将下面的代码保存在文件 /home/hadoop/workspace/reducer.py
中。它将从STDIN
读取mapper.py的结果(因此mapper.py的输出格式和reducer.py预期的输入格式必须匹配),然后统计每个单词出现的次数,最后将结果输出到STDOUT中。
需要给reducer.py文件赋予可执行权限:
chmod +x /home/hadoop/workspace/reducer.py
/home/hadoop/workspace/reducer.py代码如下
#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys
current_word = None
current_count = 0
word = None
for line in sys.stdin:
# 移除line收尾的空白字符
line = line.strip()
# 解析我们从mapper.py得到的输入
word, count = line.split('\t', 1)
# 将字符串count转换为int
try:
count = int(count)
except ValueError:
# 不是数字,不做处理,跳过
continue
# hadoop在将kv对传递给reduce之前会进行按照key进行排序,在这里也就是word
if current_word == word:
current_count += count
else:
if current_word is not None:
# 将结果写入STDOUT
print('{}\t{}'.format(current_word, current_count))
current_count = count
current_word = word
# 最后一个单词不要忘记输出
if current_word == word:
print('{}\t{}'.format(current_word, current_count))
代码测试
在MapReduce作业中正式使用mapper.py和reducer.py之前,最好先在本地测试mapper.py和reducer.py脚本。否则,作业可能成功完成了但没有得到作业结果数据或者得到了不是你想要的结果。
这里有一些想法,关于如何测试这个Map和Reduce脚本的功能。
使用 cat data | map | sort | reduce
这样的顺序。具体测试如下:
root@hadoop102:workspace# cat file/input1.txt | /home/hadoop/workspace/mapper.py | sort -k1,1 | /home/hadoop/workspace/reducer.py
其中/home/hadoop/workspace/file/input1.txt
示例输入文件的内容如下:
在Hadoop上运行Python代码
root@hadoop102:workspace# hadoop fs -mkdir /input
root@hadoop102:workspace# hadoop fs -put file/* /input/
root@hadoop102:workspace# hadoop fs -ls
运行MapReduce作业
运行MapReduce作业,敲入如下命令:
root@hadoop102:workspace#hadoop jar /opt/hadoop-3.1.3/share/hadoop/tools/lib/hadoop-streaming-3.1.3.jar -file mapper.py -mapper mapper.py -file reducer.py -reducer reducer.py -input input/* -output output1
执行完后,将文件从HDFS中拷入到你本地文件系统中
root@hadoop102:workspace# hadoop fs -get output1/* /home/hadoop/workspace/file/output1/
root@hadoop102:workspace#cd /home/hadoop/workspace/file/output1/
root@hadoop102:output1# ls
part-00000 _SUCCESS
一般情况下,Hadoop对每个reducer产生一个输出文件;在我们的示例中,然而它将只创建单个文件,因为输入的文件都很小。
如果你想要在运行的时候修改Hadoop参数,如增加Reduce任务的数量,你可以使用-D选项:
-D mapred.reduce.tasks=16
只能指定reduce的task数量不能指定map的task数量。
改进Mapper和Reducer代码
上面的Mapper和Reducer例子应该给你提供了一种思路,关于如何创建第一个MapReduce程序。重点是代码简洁和易于理解,特别是对于Python语言的初学者。在现实程序中,你可能想要通过Python的迭代器和生成器来优化你的代码。
一般来说,迭代器和生成器有一个优点:序列中的元素在你需要它的时候才会生成。计算资源昂贵或内存紧缺的时候很有用。
advanced_mapper.py
advanced_mapper.py是改进之后的mapper代码:
#! /usr/bin/python
# -*- coding: utf-8 -*-
import sys
def read_input(std_input):
for line in std_input:
# 将line分割成单词
yield line.rstrip().split()
def main(separator='\t'):
# 从标准输入STDIN输入
data = read_input(sys.stdin)
for words in data:
# 将结果写到标准输出,此处的输出会作为reduce的输入
for word in words:
print('{}{}{}'.format(word, separator, 1))
if __name__ == "__main__":
main()
advanced_mapper.py
advanced_reducer.py是改进之后的reducer代码:
#! /usr/bin/python
# -*- coding: utf-8 -*-
from itertools import groupby
from operator import itemgetter
import sys
def read_mapper_output(std_input, separator='\t'):
for line in std_input:
yield line.rstrip().split(separator, 1)
def main(separator='\t'):
# 从STDIN输入
data = read_mapper_output(sys.stdin, separator=separator)
# groupby通过word对多个word-count对进行分组,并创建一个返回连续键和它们的组的迭代器:
# - current_word - 包含单词的字符串(键)
# - group - 是一个迭代器,能产生所有的["current_word", "count"]项
# itemgetter: 用于获取对象的哪些维的数据,itemgetter(0)表示获取第0维
for current_word, group in groupby(data, itemgetter(0)):
try:
total_count = sum(int(count) for current_word, count in group)
print('{}{}{}'.format(current_word, separator, total_count))
except ValueError:
pass
if __name__ == '__main__':
main()
若有侵权,通知即删