Streaming执行Python版Wo

发布时间:2019-09-21 11:04:12编辑:auto阅读(1499)

    一:先写map类

    import sys
    for line in sys.stdin:
    line = line.strip( )
    words = line.split( )
    for word in words:
    print('%s\t%s' % (word, 1))


    二:写reduce类

    import sys
    current_word = None
    current_count = 0
    word = None
    for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t',1)
    try:
    count = int(count)
    except ValueError:
    continue
    if current_word == word:
    current_count += count
    else:
    if current_word:
    print('%s\t%s' % (current_word,current_count))
    current_count = count
    current_word = word
    if current_word == word:
    print('%s\t%s' % (current_word,current_count))


    三:利用hadoop Streaming执行Python的内容。

    hadoop jar /home/hadoop/hadoop-2.6.0-cdh5.5.2/share/hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.5.2.jar  -input /user/hadoop/aa.txt -output /user/hadoop/python_output -mapper "python mapper.py" -reducer "python reducer.py" -file mapper.py -file reducer.py  


    说明:

    输入和输出路径,本身就是hdfs上的,不需要特殊指定hdfs。

    不加×××部分的引号的话,会报错误:

    Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2

    不加粉色部分的内容的话,会报错误:

    Error: java.lang.RuntimeException: Error in configuring object


关键字