大数据处理神器map-reduce实现(仅python和shell版本),map-reducepython
大数据处理神器map-reduce实现(仅python和shell版本),map-reducepython
熟悉java的人直接可以使用java实现map-reduce过程,而像我这种不熟悉java的怎么办?为了让非java程序员方便处理数据,我把使用python,shell实现streaming的过程,也即为map-reduce过程,整理如下:
1.如果数据不在hive里面,而在hdfs中,可以使用下面的方式处理:
python版本的map-reduce
test.py
#!/bin/python
import sys
from operator import itemgetter
from itertools import groupby
def mapper():
f = sys.stdin
for line in f:
line_list = line.strip().split("分隔符")
#coding 这里码码把内容变成key-value的形式,方便reduec的时候按key聚合
def read_input(file):
for line in file:
yield line.strip().split("分隔符")
def reducer():
data = read_input(sys.stdin)
for key, kvalue in groupby(data, itemgetter(0)):
temp_list = [ele[1] for ele in kvalue]
#coding 这里码码实现想要的聚合效果
if __name__ == '__main__':
flag = sys.argv[1]
if flag == "map":
mapper()
elif flag == "reduce":
reducer()
else:
print "error param"
线下测试的时候使用命令:cat filename | python test.py map | sort -k1 | python test.py reduce > text
shell版的map和python版的reduce一块处理数据,重复利用两种脚本处理数据的优势,当然这也是个人习惯而已
shell版的map
test_map.sh
cat | awk ''
使用cat 接收文件,当然也可以不用,个人习惯,然后使用awk等出来数据,把处理的单行数据输出
python的reduce
reduce_test.py
#!/bin/python
import sys
from operator import itemgetter
from itertools import groupby
def read_input(file):
"""Read input and split."""
for line in file:
yield line.rstrip().split('\t')
def reducer():
data = read_input(sys.stdin)
for key, kviter in groupby(data, itemgetter(0)):
temp_list = [ele[1] for ele in kviter]
#coding 这里码码实现想要的聚合效果
if __name__ == '__main__':
flag = sys.argv[1]
if flag == "reduce":
reducer()
else:
print "error param"
2.如果数据在hive里面:
2.1这种情况也可以使用上面的方式处理
2.2可以写hql使用transfrom..... using 的方式处理
例如:
add file test.py; //上传文件到集群节点
select transfrom(a,b,c) using 'python test.py' as (a1,b1) from test_table where partition_name="123";
或者
select transfrom(a,b,c) using 'cat | awk ....' as (a1,b1) from test_table where partition_name="123";
当然,实现这些功能已经完成了大部分工作,还有一些情况可能需要考虑优化技术,比如因为一些原因导致的数据不均衡,使reduce计算过程中出现某个或某几个计算节点数据特别多,造成整体计算时间延长,更甚导致任务失败。
相关文章
- 暂无相关文章
用户点评