听说你会Hadoop,写个Word Count让我看看

mapper.py

import sys

for line in sys.stdin:
  flds = line.rstrip().split('\t')
  for item in flds:
    print item

reducer.py

import sys

pred_word = None
pred_cnt = 0

for word in sys.stdin:
  if pred_word == None:
    pred_word = word
  if pred_word == word:
    pred_cnt += 1
  else:
    print "\t".join([pred_word, pred_cnt])
    pred_word = word
    pred_cnt = 1

# last word
print "\t".join([pred_word, pred_cnt])

run.sh

bin/hadoop jar hadoop_streaming.jar \
    -files mapper.py,reducer.py \
    -D stream.num.map.output.key.fields=1 \
    -input /xxx/ooo/setences.txt \
    -output /xxx/ooo/wordcnt.txt \
    -mapper "python mapper.py" \
    -reducer "python reducer.py"

什么是shuffle,详细阐述下

Map阶段分为Read-Map-Collect-Spill-Merge。Read读取数据,拆分为split,对每个Split执行Map函数,然后Map的输出进入Collect阶段。map的输出是(key, value), collect调用partitioner,获得输出(partition, key, value),存入环形缓冲区,并按照key进行快速排序,如果有combiner,按照key进行聚合。缓冲区存储达到阈值时,发生spill,写入磁盘。一次spill吐出一个临时文件,信息还包含每个分区在spill文件中的偏移量,便于快速获取需要的分区。merge阶段,将多个spill临时文件进行归并排序,生成一个大文件,因此每个mapper最后输出一个文件。

Reducer阶段分为Copy-Sort-Reduce。Reducer会定期查看哪些mapper已经完成,并将其output位置放在scheduledCoppies列表里,然后启动多个MapOutputCopier线程,去ScheduledCopies列表里指定的远程位置去拷贝数据。Reducer只拷贝map输出文件里属于自己处理的partition,拷贝的数据存在内存中,放不下则生成文件。Sort阶段,对多个mapper拷贝来的结果进行归并排序,保证全局有序。然后将排序后的结果喂给reduce函数处理,保存处理结果至HDFS。

Shuffle过程就是Collect+Spill+Merge+Copy+Sort,由Hadoop框架实现。这样用户只需要关注业务逻辑相关的map逻辑和reduce逻辑。

看来有两把刷子,怎么控制reducer数和mapper数

reducer的个数用-D mapred.reduce.tasks指定。 mapper的个数由split的数量决定,split的数量由splitSize决定,splitSize=max[minSize, min(maxSize, blockSize)]. 此外,如果文件是压缩格式,也会影响splitSize,gz压缩文件不可以进行分割。

可以可以,再跟我说说什么叫二级排序

二级排序是指Collect过程会在map输出结果的key前加入partition,reduce阶段首先根据自己负责的partition抓取map端的数据,然后再按照key进行排序。

厉害,怎么使用MapReduce实现下面的join语句

select 
    u.name, o.orderid 
from 
    order_info o
join 
    user_info u
on 
    o.uid = u.uid

请绘图说明上面的join语句编译为MapReduce的结果。

出处:美团点评技术博客

image

image

Hadoop Streaming的参考实现:

mapper.py

import os
import sys
def read_input(file):
    for line in file:
        yield line.rstrip().split()

def main(separator='\t'):
    file_path = os.environ["map_input_file"] # hadoop2.5.0版本后,file_path = os.environ["mapreduce_map_input_file"]
    file_source = os.path.basename(os.path.dirname(file_path))
    data = read_input(sys.stdin)
    for flds in data:
        sno = flds[0]
        if file_source == 'data_info':
            name = flds[1]
            print '\t'.join((sno, '0', name))
        elif file_source == "data_grade":
            courseno = flds[1]
            grade = flds[2]
            print '\t'.join((sno, '1', courseno, grade))

if __name__ == "__main__":
    main()

reducer.py

from operator import itemgetter
from itertools import groupby
import sys

def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator)

def main(separator='\t'):
    data = read_mapper_output(sys.stdin, separator=separator)
    for sno, group in groupby(data, itemgetter(0)):
        name =None
        if group[1] == '0':
            name = group[2]
        elif group[1] == '1':
            courseno = group[2]
            grade = group[3]
            if name:
                print separator.join((sno, name, courseno, grade))

if __name__ == "__main__":
    main()

run_hadoop.sh

#!/bin/bash
CUR_DIR=$(cd `dirname $0`;pwd)
JAR_PATH=".xxxxx/hadoop-streaming.jar"
FILES="$CUR_DIR/mapper.py,$CUR_DIR/reducer.py"
INPUT_PATH="xxxxx/data_info/,oooooo/data_grade"
OUTPUT_PATH="xxxxx"

bin/hadoop jar $JAR_PATH \
    -files $FILES \
    -D num.key.fields.for.partition=1 \
    -D stream.num.map.output.key.fields=2 \
    -D mapred.reduce.tasks=5 \
    -input $INPUT_PATH \
    -output $OUTPUT_PATH \
    -mapper "python mapper.py" \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
    -reducer "python reducer.py"

恭喜你,Hadoop已过关。