python调用mrjob实现hadoo

发布时间:2019-09-17 07:47:30编辑:auto阅读(1966)

    咱们一般写mapreduce是通过java和streaming来写的,身为pythoner的我,

    java不会,没办法就用streaming来写mapreduce日志分析。 这里要介绍一个

    模块,是基于streaming搞的东西。


    mrjob 可以让用 Python 来编写 MapReduce 运算,并在多个不同平台上运行,你可以:

    • 使用纯 Python 编写多步的 MapReduce 作业

    • 在本机上进行测试

    • 在 Hadoop 集群上运行


    pip 的安装方法:

    pip install mrjob


    我测试的脚本

    #coding:utf-8
    from mrjob.job import MRJob
    import re
    #xiaorui.cc
    #WORD_RE = re.compile(r"[\w']+")
    WORD_RE = re.compile(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}")
    class MRWordFreqCount(MRJob):
        def mapper(self, word, line):
            for word in WORD_RE.findall(line):
                yield word.lower(), 1
        def combiner(self, word, counts):
            yield word, sum(counts)
        def reducer(self, word, counts):
            yield word, sum(counts)
    if __name__ == '__main__':
        MRWordFreqCount.run()


    用法算简单:

    python i.py -r inline input1 input2 input3 > out 命令可以将处理多个文件的结果输出到out文件里面。

    本地模拟hadoop运行:python 1.py -r local <input> output

    这个会把结果输出到output里面,这个output必须写。

    hadoop集群上运行:python 1.py -r hadoop <input> output


    执行脚本 ~

    [root@kspc ~]# python mo.py -r local  <10.7.17.7-dnsquery.log.1> output
    no configs found; falling back on auto-configuration
    no configs found; falling back on auto-configuration
    creating tmp directory /tmp/mo.root.20131224.040935.241241
    reading from STDIN
    writing to /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00000
    > /usr/bin/python mo.py --step-num=0 --mapper /tmp/mo.root.20131224.040935.241241/input_part-00000 | sort | /usr/bin/python mo.py --step-num=0 --combiner > /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00000
    writing to /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00001
    > /usr/bin/python mo.py --step-num=0 --mapper /tmp/mo.root.20131224.040935.241241/input_part-00001 | sort | /usr/bin/python mo.py --step-num=0 --combiner > /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00001
    Counters from step 1:
      (no counters found)
    writing to /tmp/mo.root.20131224.040935.241241/step-0-mapper-sorted
    > sort /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00000 /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00001
    writing to /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00000
    > /usr/bin/python mo.py --step-num=0 --reducer /tmp/mo.root.20131224.040935.241241/input_part-00000 > /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00000
    writing to /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00001
    > /usr/bin/python mo.py --step-num=0 --reducer /tmp/mo.root.20131224.040935.241241/input_part-00001 > /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00001
    Counters from step 1:
      (no counters found)
    Moving /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00000 -> /tmp/mo.root.20131224.040935.241241/output/part-00000
    Moving /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00001 -> /tmp/mo.root.20131224.040935.241241/output/part-00001
    Streaming final output from /tmp/mo.root.20131224.040935.241241/output
    removing tmp directory /tmp/mo.root.20131224.040935.241241

    执行的时候,资源的占用情况。

    133630767.jpg


    发现一个很奇妙的东西,mrjob居然调用shell下的sort来排序。。。。

    133937941.jpg


    为了更好的理解mrjob的用法,再来个例子。


    from mrjob.job import MRJob
    #from xiaorui.cc
    class MRWordFrequencyCount(MRJob):
    #把东西拼凑起来
        def mapper(self, _, line):
            yield "chars", len(line)
            yield "words", len(line.split())
            yield "lines", 1
    #总结kv
        def reducer(self, key, values):
            yield key, sum(values)
    if __name__ == '__main__':
        MRWordFrequencyCount.run()


    看下结果:

    135509171.jpg

    下面是官网给的一些个用法:


    我们可以看到他是支持hdfs和s3存储的 !

    Running your job different ways

    The most basic way to run your job is on the command line:

    $ python my_job.py input.txt

    By default, output will be written to stdout.

    You can pass input via stdin, but be aware that mrjob will just dump it to a file first:

    $ python my_job.py < input.txt

    You can pass multiple input files, mixed with stdin (using the - character):

    $ python my_job.py input1.txt input2.txt - < input3.txt

    By default, mrjob will run your job in a single Python process. This provides the friendliest debugging experience, but it’s not exactly distributed computing!

    You change the way the job is run with the -r/--runner option. You can use -rinline (the default), -rlocal, -rhadoop, or -remr.

    To run your job in multiple subprocesses with a few Hadoop features simulated, use -rlocal.

    To run it on your Hadoop cluster, use -rhadoop.

    If you have Elastic MapReduce configured (see Elastic MapReduce Quickstart), you can run it there with -remr.

    Your input files can come from HDFS if you’re using Hadoop, or S3 if you’re using EMR:

    $ python my_job.py -r emr s3://my-inputs/input.txt
    $ python my_job.py -r hadoop hdfs://my_home/input.txt



关键字