pygrametl的使用--python

发布时间:2019-09-22 08:09:12编辑:auto阅读(2338)

    pygrametl是一个python的package用于ETL(Extract-Transform-Load )

    简例

    import MySQLdb
    from pygrametl.datasources import SQLSource

    conn = MySQLdb.connect(host="localhost", user="root", passwd="123456", db="ustcck", charset="utf8")

    sql = "SELECT * FROM student;"
    newnames = 'ID', 'Name', 'Gender'
    resultsSource = SQLSource(connection=conn, query=sql, names=newnames)
    print type(resultsSource)

    for row in resultsSource:
        print row
        print row["Name"]


    1.安装以及安装测试

    $ pip install pygrametl


    >>> import pygrametl
    >>>

    ok了!

    2.pygrametl 支持多种数据源

    'BackgroundSource', 'CSVSource', 'CrossTabbingSource', 'DictReader', 'DynamicForEachSource', 'FilteringSource', 'HashJoiningSource', 'JoiningSource', 'MergeJoiningSource', 'Process', 'ProcessSource', 'Queue', 'RoundRobinSource', 'SQLSource', 'TransformingSource', 'TypedCSVSource', 'UnionSource'..........

    如:

    (1)

    import psycopg2
    import pygrametl
    from pygrametl.datasources import SQLSource

    conn = psycopg2.connect(database="db", user="dbuser", password="dbpass")

    sql = "SELECT * FROM table;"
    resultsSource = SQLSource(connection=conn, query=sql)

    (2)

    import pygrametl
    from pygrametl.datasources import CSVSource

    resultsSource = CSVSource(csvfile=open('ResultsFile.csv', 'r', 16384), delimiter=',')

    3.Dimension(维度)

    pygrametl 提供了数据仓库维度交互,提供了一个在table中执行增删改查操作的接口。

    使用Dimension两步走:

    (1)创建ConnectionWrapper

    (2)必须指定table的名字,key以及表中其他的列

    下面是一个使用Dimension将相应的数据的插入到对应维度的操作的例子:(假设table已经存在,维度有'productid', 'name', 'category', 'price')

    import psycopg2
    import pygrametl
    from pygrametl.tables import Dimension


    products = [
        {'name' : 'Calvin and Hobbes 1', 'category' : 'Comic', 'price' : '10'},
        {'name' : 'Calvin and Hobbes 2', 'category' : 'Comic', 'price' : '10'},
        {'name' : 'Calvin and Hobbes 3', 'category' : 'Comic', 'price' : '10'},
        {'name' : 'Cake and Me', 'category' : 'Cookbook', 'price' : '15'},
        {'name' : 'French Cooking', 'category' : 'Cookbook', 'price' : '50'},
        {'name' : 'Sushi', 'category' : 'Cookbook', 'price' : '30'},
        {'name' : 'Nineteen Eighty-Four', 'category' : 'Novel', 'price' : '15'},
        {'name' : 'The Lord of the Rings', 'category' : 'Novel', 'price' : '60'}
    ]

    pgconn = psycopg2.connect("""host='localhost' dbname='dw' user='dwuser'
                              password='dwpass'""")

    conn = pygrametl.ConnectionWrapper(connection=pgconn)

    productDimension = Dimension(
        name='product',
        key='productid',
        attributes=['name', 'category', 'price'],
        lookupatts=['name'])

    for row in products:
        productDimension.insert(row)
    conn.commit()
    conn.close()

    4.FactTable。给个例子你就知道了

    例:

    三步走:

    (1)建立一个connection

    (2)创建一个ConnectionWrapper实例

    (3)创建 FactTable

    import MySQLdb
    import pygrametl
    from pygrametl.tables import FactTable

    conn = MySQLdb.connect(host="localhost", user="root", passwd="123", db="ustcck", charset="utf8")

    conn = pygrametl.ConnectionWrapper(connection=conn)

    factTable = FactTable(
        name='facttable',
        measures=['price'],
        keyrefs=['storeid', 'productid', 'dateid'])

    # A list of facts are ready to inserted into the fact table
    facts = [{'storeid': 1, 'productid': 13, 'dateid': 4, 'price': 50},
             {'storeid': 2, 'productid': 7, 'dateid': 4, 'price': 75},
             {'storeid': 1, 'productid': 7, 'dateid': 4, 'price': 50},
             {'storeid': 3, 'productid': 9, 'dateid': 4, 'price': 25}]


    # The facts can be inserted using the insert method, before committing to DB
    for row in facts:
        factTable.insert(row)
    conn.commit()

    # Lookup retunes all both keys and measures given only the keys
    factTable.lookup({'storeid': 1, 'productid': 13, 'dateid': 4})

    # If a set of facts contain facts already existing in the database can the
    # ensure method be used instead of calling lookup and insert manually, we
    # also rename 'itemid' to 'productid' using the name mapping feature
    newFacts = [{'storeid': 2, 'itemid': 7, 'dateid': 4, 'price': 75},
                {'storeid': 1, 'itemid': 7, 'dateid': 4, 'price': 50},
                {'storeid': 1, 'itemid': 2, 'dateid': 7, 'price': 150},
                {'storeid': 3, 'itemid': 3, 'dateid': 6, 'price': 100}]

    for row in newFacts:
        # The second argument forces FactTable.ensure to not only match the keys
        # for facts to be considered equal, but also checks if the measures are
        # the same for facts with the same key, and if not raises a ValueError
        factTable.ensure(row, True, {'productid': 'itemid'})
    conn.commit()
    conn.close()

    5.Bulk Loading(大面积载入数据)

    三个类可以用于Bulk Loading: BulkDimension, BulkFactTable, and CachedBulkDimension

    #MySQLdb
    def mysqlbulkloader(name, attributes, fieldsep, rowsep, nullval, filehandle):
        global connection
        cursor = connection.cursor()
        sql = "LOAD DATA LOCAL INFILE '%s' INTO TABLE %s FIELDS TERMINATED BY '%s' LINES TERMINATED BY '%s' (%s);" % \
                (filehandle, name, fieldsep, rowsep, ', '.join(attributes))
        cursor.execute(sql)

    参数含义:Parameters
    name – 表名
    attributes –属性序列列表
    fieldsep – 属性分隔符
    rowsep – row分隔符
    nullval – null的替代
    filehandle – 文件名或者文件对象


关键字