mapper.py
import sys
for line in sys.stdin:
flds = line.rstrip().split('\t')
for item in flds:
print item
reducer.py
import sys
pred_word = None
pred_cnt = 0
for word in sys.stdin:
if pred_word == None:
pred_word = word
if pred_word == word:
pred_cnt += 1
else:
print "\t".join([pred_word, pred_cnt])
pred_word = word
pred_cnt = 1
# last word
print "\t".join([pred_word, pred_cnt])
run.sh
bin/hadoop jar hadoop_streaming.jar \
-files mapper.py,reducer.py \
-D stream.num.map.output.key.fields=1 \
-input /xxx/ooo/setences.txt \
-output /xxx/ooo/wordcnt.txt \
-mapper "python mapper.py" \
-reducer "python reducer.py"
Map阶段分为Read-Map-Collect-Spill-Merge。Read读取数据,拆分为split,对每个Split执行Map函数,然后Map的输出进入Collect阶段。map的输出是(key, value), collect调用partitioner,获得输出(partition, key, value),存入环形缓冲区,并按照key进行快速排序,如果有combiner,按照key进行聚合。缓冲区存储达到阈值时,发生spill,写入磁盘。一次spill吐出一个临时文件,信息还包含每个分区在spill文件中的偏移量,便于快速获取需要的分区。merge阶段,将多个spill临时文件进行归并排序,生成一个大文件,因此每个mapper最后输出一个文件。
Reducer阶段分为Copy-Sort-Reduce。Reducer会定期查看哪些mapper已经完成,并将其output位置放在scheduledCoppies列表里,然后启动多个MapOutputCopier线程,去ScheduledCopies列表里指定的远程位置去拷贝数据。Reducer只拷贝map输出文件里属于自己处理的partition,拷贝的数据存在内存中,放不下则生成文件。Sort阶段,对多个mapper拷贝来的结果进行归并排序,保证全局有序。然后将排序后的结果喂给reduce函数处理,保存处理结果至HDFS。
Shuffle过程就是Collect+Spill+Merge+Copy+Sort,由Hadoop框架实现。这样用户只需要关注业务逻辑相关的map逻辑和reduce逻辑。
reducer的个数用-D mapred.reduce.tasks指定。 mapper的个数由split的数量决定,split的数量由splitSize决定,splitSize=max[minSize, min(maxSize, blockSize)]. 此外,如果文件是压缩格式,也会影响splitSize,gz压缩文件不可以进行分割。
二级排序是指Collect过程会在map输出结果的key前加入partition,reduce阶段首先根据自己负责的partition抓取map端的数据,然后再按照key进行排序。
select
u.name, o.orderid
from
order_info o
join
user_info u
on
o.uid = u.uid
请绘图说明上面的join语句编译为MapReduce的结果。
出处:美团点评技术博客
Hadoop Streaming的参考实现:
mapper.py
import os
import sys
def read_input(file):
for line in file:
yield line.rstrip().split()
def main(separator='\t'):
file_path = os.environ["map_input_file"] # hadoop2.5.0版本后,file_path = os.environ["mapreduce_map_input_file"]
file_source = os.path.basename(os.path.dirname(file_path))
data = read_input(sys.stdin)
for flds in data:
sno = flds[0]
if file_source == 'data_info':
name = flds[1]
print '\t'.join((sno, '0', name))
elif file_source == "data_grade":
courseno = flds[1]
grade = flds[2]
print '\t'.join((sno, '1', courseno, grade))
if __name__ == "__main__":
main()
reducer.py
from operator import itemgetter
from itertools import groupby
import sys
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.rstrip().split(separator)
def main(separator='\t'):
data = read_mapper_output(sys.stdin, separator=separator)
for sno, group in groupby(data, itemgetter(0)):
name =None
if group[1] == '0':
name = group[2]
elif group[1] == '1':
courseno = group[2]
grade = group[3]
if name:
print separator.join((sno, name, courseno, grade))
if __name__ == "__main__":
main()
run_hadoop.sh
#!/bin/bash
CUR_DIR=$(cd `dirname $0`;pwd)
JAR_PATH=".xxxxx/hadoop-streaming.jar"
FILES="$CUR_DIR/mapper.py,$CUR_DIR/reducer.py"
INPUT_PATH="xxxxx/data_info/,oooooo/data_grade"
OUTPUT_PATH="xxxxx"
bin/hadoop jar $JAR_PATH \
-files $FILES \
-D num.key.fields.for.partition=1 \
-D stream.num.map.output.key.fields=2 \
-D mapred.reduce.tasks=5 \
-input $INPUT_PATH \
-output $OUTPUT_PATH \
-mapper "python mapper.py" \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-reducer "python reducer.py"
恭喜你,Hadoop已过关。