发布时间:2019-09-22 08:09:12编辑:auto阅读(2338)
pygrametl是一个python的package用于ETL(Extract-Transform-Load )
简例
import MySQLdb
from pygrametl.datasources import SQLSource
conn = MySQLdb.connect(host="localhost", user="root", passwd="123456", db="ustcck", charset="utf8")
sql = "SELECT * FROM student;"
newnames = 'ID', 'Name', 'Gender'
resultsSource = SQLSource(connection=conn, query=sql, names=newnames)
print type(resultsSource)
for row in resultsSource:
print row
print row["Name"]
1.安装以及安装测试
$ pip install pygrametl
>>> import pygrametl
>>>
ok了!
2.pygrametl 支持多种数据源
'BackgroundSource', 'CSVSource', 'CrossTabbingSource', 'DictReader', 'DynamicForEachSource', 'FilteringSource', 'HashJoiningSource', 'JoiningSource', 'MergeJoiningSource', 'Process', 'ProcessSource', 'Queue', 'RoundRobinSource', 'SQLSource', 'TransformingSource',
'TypedCSVSource', 'UnionSource'..........
如:
(1)
import psycopg2
import pygrametl
from pygrametl.datasources import SQLSource
conn = psycopg2.connect(database="db", user="dbuser", password="dbpass")
sql = "SELECT * FROM table;"
resultsSource = SQLSource(connection=conn, query=sql)
(2)
import pygrametl
from pygrametl.datasources import CSVSource
resultsSource = CSVSource(csvfile=open('ResultsFile.csv', 'r', 16384), delimiter=',')
3.Dimension(维度)
pygrametl 提供了数据仓库维度交互,提供了一个在table中执行增删改查操作的接口。
使用Dimension两步走:
(1)创建ConnectionWrapper
(2)必须指定table的名字,key以及表中其他的列
下面是一个使用Dimension将相应的数据的插入到对应维度的操作的例子:(假设table已经存在,维度有'productid', 'name', 'category', 'price')
import psycopg2
import pygrametl
from pygrametl.tables import Dimension
products = [
{'name' : 'Calvin and Hobbes 1', 'category' : 'Comic', 'price' : '10'},
{'name' : 'Calvin and Hobbes 2', 'category' : 'Comic', 'price' : '10'},
{'name' : 'Calvin and Hobbes 3', 'category' : 'Comic', 'price' : '10'},
{'name' : 'Cake and Me', 'category' : 'Cookbook', 'price' : '15'},
{'name' : 'French Cooking', 'category' : 'Cookbook', 'price' : '50'},
{'name' : 'Sushi', 'category' : 'Cookbook', 'price' : '30'},
{'name' : 'Nineteen Eighty-Four', 'category' : 'Novel', 'price' : '15'},
{'name' : 'The Lord of the Rings', 'category' : 'Novel', 'price' : '60'}
]
pgconn = psycopg2.connect("""host='localhost' dbname='dw' user='dwuser'
password='dwpass'""")
conn = pygrametl.ConnectionWrapper(connection=pgconn)
productDimension = Dimension(
name='product',
key='productid',
attributes=['name', 'category', 'price'],
lookupatts=['name'])
for row in products:
productDimension.insert(row)
conn.commit()
conn.close()
4.FactTable。给个例子你就知道了
例:
三步走:
(1)建立一个connection
(2)创建一个ConnectionWrapper实例
(3)创建 FactTable
import MySQLdb
import pygrametl
from pygrametl.tables import FactTable
conn = MySQLdb.connect(host="localhost", user="root", passwd="123", db="ustcck", charset="utf8")
conn = pygrametl.ConnectionWrapper(connection=conn)
factTable = FactTable(
name='facttable',
measures=['price'],
keyrefs=['storeid', 'productid', 'dateid'])
# A list of facts are ready to inserted into the fact table
facts = [{'storeid': 1, 'productid': 13, 'dateid': 4, 'price': 50},
{'storeid': 2, 'productid': 7, 'dateid': 4, 'price': 75},
{'storeid': 1, 'productid': 7, 'dateid': 4, 'price': 50},
{'storeid': 3, 'productid': 9, 'dateid': 4, 'price': 25}]
# The facts can be inserted using the insert method, before committing to DB
for row in facts:
factTable.insert(row)
conn.commit()
# Lookup retunes all both keys and measures given only the keys
factTable.lookup({'storeid': 1, 'productid': 13, 'dateid': 4})
# If a set of facts contain facts already existing in the database can the
# ensure method be used instead of calling lookup and insert manually, we
# also rename 'itemid' to 'productid' using the name mapping feature
newFacts = [{'storeid': 2, 'itemid': 7, 'dateid': 4, 'price': 75},
{'storeid': 1, 'itemid': 7, 'dateid': 4, 'price': 50},
{'storeid': 1, 'itemid': 2, 'dateid': 7, 'price': 150},
{'storeid': 3, 'itemid': 3, 'dateid': 6, 'price': 100}]
for row in newFacts:
# The second argument forces FactTable.ensure to not only match the keys
# for facts to be considered equal, but also checks if the measures are
# the same for facts with the same key, and if not raises a ValueError
factTable.ensure(row, True, {'productid': 'itemid'})
conn.commit()
conn.close()
5.Bulk Loading(大面积载入数据)
三个类可以用于Bulk Loading: BulkDimension, BulkFactTable, and CachedBulkDimension
#MySQLdb
def mysqlbulkloader(name, attributes, fieldsep, rowsep, nullval, filehandle):
global connection
cursor = connection.cursor()
sql = "LOAD DATA LOCAL INFILE '%s' INTO TABLE %s FIELDS TERMINATED BY '%s' LINES TERMINATED BY '%s' (%s);" % \
(filehandle, name, fieldsep, rowsep, ', '.join(attributes))
cursor.execute(sql)
参数含义:Parameters
name – 表名
attributes –属性序列列表
fieldsep – 属性分隔符
rowsep – row分隔符
nullval – null的替代
filehandle – 文件名或者文件对象
上一篇: python 常用类库!
下一篇: 【Python】Python中如何实现f
47831
46369
37253
34714
29299
25960
24876
19935
19518
18005
5776°
6401°
5912°
5954°
7054°
5898°
5929°
6425°
6390°
7760°