sqoop脚本批量生成

发布时间:2019-09-25 08:14:31编辑:auto阅读(1645)

    • 通过all_tab_columnss字典表生成hive的建表语句

    create or replace view create_sql as
    --通过all_tab_columnss字典表生成hive的建表语句
    select owner,table_name, case

         when nm = 1 then
          'create table ' || owner || '.' || TABLE_NAME || ' (' ||
          COLUMN_NAME || ' ' || DATA_TYPE || ','
         when np = 1 then
          COLUMN_NAME || ' ' || DATA_TYPE || ') partitioned by (dt string);'
         else
          COLUMN_NAME || ' ' || DATA_TYPE || ','
       end create_sql

    from (

        SELECT OWNER,
                TABLE_NAME,
                COLUMN_NAME,
                CASE
                  WHEN DATA_TYPE IN ('VARCHAR2','LONG','CLOB','CHAR','NCHAR','BLOB','VARCHAR2') THEN
                   'STRING'
                  WHEN DATA_TYPE IN ('NUMBER') then
                   'DECIMAL' || '(' || DATA_PRECISION || ',' || DATA_SCALE || ')'
                  WHEN DATA_TYPE IN ('DATE', 'TIMESTAMP(6)') THEN
                   'STRING'
                  ELSE
                   DATA_TYPE||'(' || DATA_PRECISION || ',' || DATA_SCALE || ')'
                END DATA_TYPE,
                COLUMN_ID,
                NM,
                NP
          FROM (select t.*,
                        row_number() over(partition by owner, TABLE_NAME order by COLUMN_ID) nm,
                        row_number() over(partition by owner, TABLE_NAME order by COLUMN_ID desc) np
                 
                   from all_tab_columns t
                  ))

    ORDER BY OWNER, TABLE_NAME, COLUMN_ID;


    二、生成sqoop任务的配置表
    select
    owner||table_name job_name, --任务名称
    '' table_type, --表类型dim 维表,fact_i 只增,Fact_iud 增删改
    '' partition, --hive表是否是分区表 0 非,1 是
    'BS' source_db, --来源业务系统,一般用源表用户名或库名
    owner||'.'||table_name source_table, --源表名称
    '' datatime_cloumn, --增量时间戳字段
    'APPEND' incremental, --增量接入方式,append:增量接入,overwrite:全量接入
    '' SPLIT_BY, --并行字段,选择重复数据较少的字段
    'APP_NO' row_key, --主键字段
    'fz_bs' hive_db, --指定接入的hive库
    table_name hive_table, --指定接入的hive表,必须是无数据库名的存表名
    '' check_column, --指定接入的源表字段
    columns
    from (
    select owner,table_name,wm_concat(column_name)over(partition by owner,table_name order by column_id) columns,rn from (
    select owner,table_name,column_name,column_id,row_number()over(partition by owner,table_name order by column_id desc) rn from all_tab_column
    where owner||'_'||table_name in(
    'BS_S_FAULT_RPT',
    'BS_ARC_S_FAULT_RPT',
    'HIS_ARC_S_FAULT_RPT',
    'BS_S_FAULT_HANDLE',
    'BS_ARC_S_FAULT_HANDLE',
    'HIS_ARC_S_FAULT_HANDLE',
    'BS_S_RETVISIT',
    'BS_ARC_S_RETVISIT',
    'HIS_ARC_S_RETVISIT',
    'BS_S_95598_WKST_RELA',
    'HIS_S_95598_WKST_RELA')
    order by owner,table_name,column_id
    ))
    where rn=1
    order by owner,table_name;


    • 下面是自动生成sqoop配置任务的的python脚本

    hive任务

    !/usr/bin/python3

    coding=utf-8

    import json,os
    def json_make(input_file='./tablelist'):

    #input_file ='./sqoopjob/tablelist'
    lines = open(input_file, "r",encoding="utf_8_sig").readlines()
    [lines.remove(i) for i in lines if i in ['', '\n']]
    lines = [line.strip() for line in lines]
    
    # 获取键值
    keys = lines[0].split('\t')
    line_num = 1
    total_lines = len(lines)
    parsed_datas = []
    while line_num < total_lines:
            values = lines[line_num].split("\t")
            parsed_datas.append(dict(zip(keys, values)))
            line_num = line_num + 1
    json_str = json.dumps(parsed_datas, ensure_ascii=False, indent=4)
    output_file = input_file+'.json'
    
    # write to the file
    f = open(output_file, "w", encoding="utf-8")
    f.write(json_str)
    f.close()
    print('json格式转换结束!详见%s'%(output_file))
    
    

    def create_job(tablelist):

    if os.path.exists('sqoopjob'):
        os.system('rm -fr sqoopjob/*')
        os.system('mkdir  sqoopjob/hive-bin')
        os.system('mkdir  sqoopjob/impala-bin')
        os.system('mkdir  sqoopjob/partition-bin')
        os.system('mkdir  sqoopjob/job-bin')
    else:
        os.mkdir('sqoopjob')
    sqoopmeta='jdbc:hsqldb:hsql://localhost:16000/sqoop'
    jdbc='jdbc:oracle:thin:@10.90.87.35:11521:bdccdb2 --username sqoopuser --password Bigdata_2016'
    sjf=open('sqoopjob/createjob.sh', "w")
    hcf = open('./sqoopjob/hql_corntab.cron', 'w')
    imcf=open('./sqoopjob/imql_corntab.cron', 'w')
    crontabf=open('./sqoopjob/crontab.cron', 'w')
    df=open('./sqoopjob/deletejob.sh', 'w')
    #scmd=open('./sqoopjob/sqoopcmd.sh', 'w')
    #scmd.write('''yesdate=`date -d last-day +%Y-%m-%d`;todday=`date +%Y-%m-%d`''')
    #cf=open('./sqoopjob/cron.sh', 'w')
    kerboros='kinit -kt /keytab/sqoopdba.keytab sqoopdba \n'
    
    with open(tablelist, 'r') as load_f:
        load_dict = json.load(load_f)
        for job in load_dict:
            if job['table_type'].lower()=='dim' and job['partition']=='0':
                #处理档案表
                hivetruncate='''hive -e"use {hive_db};truncate table {hive_table}_new"\n'''.format(hive_db=job['hive_db'],hive_table=job['hive_table'])
                createjob = '''sqoop  job --create {job_name} --meta-connect {sqoopmeta} \\
                        -- import --connect {jdbc} \\
                        --table {source_table} \\
                        --incremental append \\
                        --split-by {split_by} \\
                        --hive-import \\
                        --hive-drop-import-delims \\
                        --hive-overwrite \\
                        --hive-table {hive_db}.{hive_table}_NEW \\
                        --check-column  {check_column} \\
                        --last-value '2011-01-01 11:00:00' \\
                        -m 2;\n'''.format(
                        job_name=job['job_name'].lower(),
                        sqoopmeta=sqoopmeta,
                        jdbc=jdbc,
                        hive_db=job['hive_db'],
                        source_table=job['source_table'].upper(),
                        split_by=job['split_by'].upper(),
                        hive_table=job['hive_table'].upper(),
                        check_column=job['datatime_cloumn'].upper()
                        ).replace('    ','')
    
                execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                        --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ','')
    
                deledrop = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                        --delete {job_name};\n'''.format(job_name=job['job_name'].lower() ).replace('    ','')
    
                hql = '''hive -e "use {hive_db};
                    create table {tmp_table} as
                    select {columns} from
                    (select t.*,row_number()over(partition by {pattition_by} ORDER BY  t.{order_by} desc) as rn
                    from (select * from {table_name}_new union all select * from {table_name}) t
                    ) tm
                    where rn =1;
                    drop table {table_name};
                    alter table {table_name}_TMP rename to {table_name};
                    "\n'''.format(
                                        hive_db=job['hive_db'],
                                        tmp_table=job['source_table'].replace('.', '_')+'_tmp',
                                        columns=job['columns'],
                                        pattition_by=','.join(['t.' + cl for cl in job['pattition_by'].split(',')]),
                                        order_by=job['datatime_cloumn'],
                                        table_name=job['source_table'].replace('.', '_'),
                                        ).replace('    ','')
                imql = '''impala-shell -i bigdata-w-001 -q "invalidate metadata;use {hive_db};
                    create table {tmp_table} as
                    select {columns} from
                    (select t.*,row_number()over(partition by {pattition_by} ORDER BY  t.{order_by} desc) as rn
                    from (select * from {table_name}_new union all select * from {table_name}) t
                    ) tm
                    where rn =1;
                    drop table {table_name};
                    alter table {table_name}_TMP rename to {table_name};
                    "\n'''.format(
                                        hive_db=job['hive_db'],
                                        tmp_table=job['source_table'].replace('.', '_') + '_tmp',
                                        columns=job['columns'],
                                        pattition_by=','.join(['t.' + cl for cl in job['pattition_by'].split(',')]),
                                        order_by=job['datatime_cloumn'],
                                        table_name=job['source_table'].replace('.', '_'),
                                    ).replace('    ','')
                #print(sjf)
                sjf.write(hivetruncate+createjob)
                df.write(deledrop)
                open('./sqoopjob/hive-bin/' + job['job_name'].lower() + '_hql.sh', 'w').write(kerboros + execjob  + hql)
                open('./sqoopjob/impala-bin/' + job['job_name'].lower() + '_imql.sh', 'w').write(kerboros + execjob  + imql)
                hcf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}_hql.sh >>../logs/{job_name}_hql.out 2>&1\n'''.format(
                    job_name=job['job_name'].lower()
                        )
                        )
                imcf.write(
                '''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}_imql.sh >>../logs/{job_name}_imql.out 2>&1\n'''.format(
                    job_name=job['job_name'].lower()))
                execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                        --exec {job_name};\n'''.format(
                    job_name=job['job_name'].lower()).replace('    ','')
                open('./sqoopjob/exec_run.sh', 'a').write(execjob)
            elif job['table_type'].lower()=='fact_iud'and job['partition']=='0':
                # 处理增量事实表,有增删改查的事实表
                hivetruncate = '''hive -e"use {hive_db};truncate table {hive_table}"\n'''.format(
                    hive_db=job['hive_db'], hive_table=job['hive_table'])
                createjob = '''sqoop  job --create {job_name} --meta-connect {sqoopmeta} \\
                        -- import --connect {jdbc} \\
                        --table {source_table} \\
                        --split-by {split_by} \\
                        --hive-import \\
                        --hive-drop-import-delims \\
                        --hive-table {hive_db}.{hive_table} \\
                        --delete-target-dir \\
                        -m 2;\n'''.format(
                                                job_name=job['job_name'].lower(),
                                                sqoopmeta=sqoopmeta,
                                                jdbc=jdbc,
                                                hive_db=job['hive_db'],
                                                source_table=job['source_table'].upper(),
                                                split_by=job['split_by'].upper(),
                                                hive_table=job['hive_table'].upper(),
                                            ).replace('    ','')
                sjf.write(hivetruncate+createjob)
                execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                                        --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ',
                                                                                                                 '')
                open('./sqoopjob/job-bin/' + job['job_name'].lower() + '.sh', 'w').write(kerboros + execjob)
                open('./sqoopjob/exec_run.sh', 'a').write(execjob)
                crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}.sh >>../logs/{job_name}.out 2>&1\n'''.format(
                        job_name=job['job_name'].lower()))
            elif job['table_type'].lower()=='fact_i'and job['partition']=='0':
                # 处理在线事实表,只有写入事务的事实表
                hivetruncate = '''hive -e"use {hive_db};truncate table {hive_table}"\n'''.format(
                    hive_db=job['hive_db'], hive_table=job['hive_table'])
                createjob = '''sqoop  job --create {job_name} --meta-connect {sqoopmeta} \\
                                        -- import --connect {jdbc} \\
                                        --table {source_table} \\
                                        --incremental append \\
                                        --split-by {split_by} \\
                                        --hive-import \\
                                        --hive-drop-import-delims \\
                                        --hive-table {hive_db}.{hive_table} \\
                                        --check-column  {check_column} \\
                                        --last-value '2011-01-01 11:00:00' \\
                                        -m 2;\n'''.format(
                                                                    job_name=job['job_name'].lower(),
                                                                    sqoopmeta=sqoopmeta,
                                                                    jdbc=jdbc,
                                                                    hive_db=job['hive_db'],
                                                                    source_table=job['source_table'].upper(),
                                                                    split_by=job['split_by'].upper(),
                                                                    hive_table=job['hive_table'].upper(),
                                                                    check_column=job['datatime_cloumn'].upper()
                                                                ).replace('    ', '')
                sjf.write(hivetruncate+createjob)
                execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                        --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ', '')
                open('./sqoopjob/job-bin/' + job['job_name'].lower() + '.sh', 'w').write(kerboros + execjob)
                open('./sqoopjob/exec_run.sh', 'a').write(execjob)
                crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}.sh >>../logs/{job_name}.out 2>&1\n'''.format(
                        job_name=job['job_name'].lower()))
    
    
            elif job['partition']=='1' and job['table_type'] in ['fact_i','fact_iud']:
                #处理带有where条件查询数据
                shell_cmd='''if [ $# -gt 1 ]; then
                            yesdate=$1
                            today=$2
                            var_len1=`echo ${yesdate} |wc -L`
                            var_len2=`echo ${today} |wc -L`
                            if [[ ${var_len1} != 10 || ${var_len2} != 10 ]];then
                                echo 'vars is wrong'
                                echo 'var input like:2017-01-01 2017-01-21'
                                exit
                            fi
                        else
                            yesdate=`date -d "today -1 day " +%Y-%m-%d`
                            today=`date -d today +%Y-%m-%d`
                        fi
    
                    echo "data:${yesdate}  ${today}"\n'''.replace('    ',' ')
                createjob = '''hive -e"use {hive_db};alter table {hive_table} drop if  exists  partition (dt='$yesdate');
                        alter table {hive_table} add partition(dt='$yesdate') "
                        sqoop  import --connect {jdbc} \\
                        --table {source_table} \\
                        --where "{where} >= date'$yesdate' and {where}<date'$today' " \\
                        --split-by {split_by} \\
                        --hive-import \\
                        --hive-partition-key dt  \\
                        --hive-partition-value  $yesdate \\
                        --hive-drop-import-delims \\
                        --hive-table {hive_db}.{hive_table} \\
                        --delete-target-dir \\
                        -m 2;\n'''.format(
                                                job_name=job['job_name'].lower(),
                                                hive_db=job['hive_db'],
                                                sqoopmeta=sqoopmeta,
                                                jdbc=jdbc,
                                                source_table=job['source_table'].upper(),
                                                where=job['datatime_cloumn'],
                                                split_by=job['split_by'].upper(),
                                                hive_table=job['hive_table'].upper(),
                                            ).replace('    ','')
                #scmd.write(createjob)
                open('./sqoopjob/partition-bin/'+job['job_name'].lower()+'.sh', 'w').write(shell_cmd+createjob)
                open('./sqoopjob/exec_run.sh', 'a').write(shell_cmd+createjob)
                crontabf.write(
                    '''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}.sh >>../logs/{job_name}.out 2>&1\n'''.format(
                        job_name=job['job_name'].lower()))
            elif job['partition'] == '1' and job['table_type'] in ['dim']:
                # 处理带有where条件查询数据
                shell_cmd = '''if [ $# -gt 1 ]; then
                        yesdate=$1
                        today=$2
                        var_len1=`echo ${yesdate} |wc -L`
                        var_len2=`echo ${today} |wc -L`
                        if [[ ${var_len1} != 10 || ${var_len2} != 10 ]];then
                            echo 'vars is wrong'
                            echo 'var input like:2017-01-01 2017-01-21'
                            exit
                        fi
                    else
                        yesdate=`date -d "today -1 day " +%Y-%m-%d`
                        today=`date -d today +%Y-%m-%d`
                    fi
    
                echo "data:${yesdate}  ${today}"\n'''.replace('    ',' ')
                createjob = '''hive -e"use {hive_db};alter table {hive_table} drop if  exists  partition (dt='$yesdate');
                           alter table {hive_table} add partition(dt='$yesdate') "
                           sqoop  import --connect {jdbc} \\
                           --table {source_table} \\
                           --split-by {split_by} \\
                           --hive-import \\
                           --hive-partition-key dt  \\
                           --hive-partition-value  $yesdate \\
                           --hive-drop-import-delims \\
                           --hive-table {hive_db}.{hive_table} \\
                           --delete-target-dir \\
                           -m 2;\n'''.format(
                                                    job_name=job['job_name'].lower(),
                                                    hive_db=job['hive_db'],
                                                    sqoopmeta=sqoopmeta,
                                                    jdbc=jdbc,
                                                    source_table=job['source_table'].upper(),
                                                    where=job['datatime_cloumn'],
                                                    split_by=job['split_by'].upper(),
                                                    hive_table=job['hive_table'].upper(),
                                                    ).replace('    ', '')
                #scmd.write(createjob)
                open('./sqoopjob/partition-bin/' + job['job_name'].lower() + '.sh', 'w').write(shell_cmd+createjob)
                open('./sqoopjob/exec_run.sh', 'a').write(shell_cmd+createjob)
                crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}.sh >>../logs/{job_name}.out 2>&1\n'''.format(
                        job_name=job['job_name'].lower()))
    sjf.close()
    hcf.close()
    imcf.close()
    df.close()
    #cf.close()
    print('脚本生成结束,详见./sqoopjob/*')
    

    if __name__=='__main__':

    #生成json文件
    json_make(input_file='./tablelist')
    #生成sqoop脚本
    create_job(tablelist='tablelist.json')
    
    
    

    下面是生成hbase任务的脚本


    !/usr/bin/python3

    coding=utf-8

    import json,os
    def json_make(input_file='./hbase_tablelist'):

    #input_file ='./sqoopjob/tablelist'
    lines = open(input_file, "r",encoding="utf_8_sig").readlines()
    [lines.remove(i) for i in lines if i in ['', '\n']]
    lines = [line.strip() for line in lines]
    
    # 获取键值
    keys = lines[0].split('\t')
    line_num = 1
    total_lines = len(lines)
    parsed_datas = []
    while line_num < total_lines:
            values = lines[line_num].split("\t")
            parsed_datas.append(dict(zip(keys, values)))
            line_num = line_num + 1
    json_str = json.dumps(parsed_datas, ensure_ascii=False, indent=4)
    output_file = input_file+'.json'
    
    # write to the file
    f = open(output_file, "w", encoding="utf-8")
    f.write(json_str)
    f.close()
    print('json格式转换结束!详见%s'%(output_file))
    
    

    def create_job(tablelist):

    if os.path.exists('sqoopjob'):
        os.system('rm -fr sqoopjob/*')
        os.system('mkdir  sqoopjob/hbase-bin')
    else:
        os.mkdir('sqoopjob')
    sqoopmeta='jdbc:hsqldb:hsql://localhost:16000/sqoop'
    jdbc='jdbc:oracle:thin:@10.90.87.35:11521:bdccdb2 --username sqoopuser --password Bigdata_2016'
    sjf=open('sqoopjob/createjob.sh', "w")
    crontabf=open('./sqoopjob/crontab.cron', 'w')
    df=open('./sqoopjob/deletejob.sh', 'w')
    kerboros='kinit -kt /keytab/sqoopdba.keytab sqoopdba \n'
    
    with open(tablelist, 'r') as load_f:
        load_dict = json.load(load_f)
        for job in load_dict:
            if job['table_type'].lower()=='dim' and job['partition']=='0':
                #处理档案表
                createjob = '''sqoop  job --create {job_name} --meta-connect {sqoopmeta} \\
                        -- import --connect {jdbc} \\
                        --table {source_table} \\
                        --incremental append \\
                        --split-by {split_by} \\
                        --hbase-create-table \\
                        --hbase-table {hive_table} \\
                        --check-column  {check_column} \\
                        --last-value '2011-01-01 11:00:00' \\
                        --hbase-row-key {row_key} \\
                        --column-family cf \\
                        -m 2;\n'''.format(
                        job_name=job['job_name'].lower(),
                        sqoopmeta=sqoopmeta,
                        jdbc=jdbc,
                        source_table=job['source_table'].upper(),
                        split_by=job['split_by'].upper(),
                        hive_table=job['hive_table'].upper(),
                        check_column=job['datatime_cloumn'].upper(),
                        row_key=job['row_key']
                        ).replace('    ','')
    
                execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                        --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ','')
    
                deledrop = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                        --delete {job_name};\n'''.format(job_name=job['job_name'].lower() ).replace('    ','')
    
                createtable=''' CREATE EXTERNAL TABLE {hive_table}({columns})

    STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:LAST_ANALYZED,cf:SAMPLE_SIZE,cf:CHARACTER_SET_NAME")
    TBLPROPERTIES("hbase.table.name" = "{hive_table}", "hbase.mapred.output.outputtable" = "{hive_table}");
    '''.format(hive_table=job['hive_table'],

           columns=job['colums'].split(',').join())
                #print(sjf)
                sjf.write(createjob)
    
                df.write(deledrop)
                open('./sqoopjob/hbase-bin/' + job['job_name'].lower() + '.sh', 'w').write(execjob)
                crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}_hql.sh >>../logs/{job_name}_hql.out 2>&1\n'''.format(
                    job_name=job['job_name'].lower()))
                execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                        --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ','')
                open('./sqoopjob/exec_run.sh', 'a').write(execjob)
            else :pass
    
    sjf.close()
    df.close()
    #cf.close()
    print('脚本生成结束,详见./sqoopjob/*')
    

    if __name__=='__main__':

    #生成json文件
    json_make(input_file='./hbase_tablelist')
    #生成sqoop脚本
    create_job(tablelist='./hbase_tablelist.json')
    
    
    

    hbase任务

    !/usr/bin/python3

    coding=utf-8

    import json,os
    def json_make(input_file='./hbase_tablelist'):

    #input_file ='./sqoopjob/tablelist'
    lines = open(input_file, "r",encoding="utf_8_sig").readlines()
    [lines.remove(i) for i in lines if i in ['', '\n']]
    lines = [line.strip() for line in lines]
    
    # 获取键值
    keys = lines[0].split('\t')
    line_num = 1
    total_lines = len(lines)
    parsed_datas = []
    while line_num < total_lines:
            values = lines[line_num].split("\t")
            parsed_datas.append(dict(zip(keys, values)))
            line_num = line_num + 1
    json_str = json.dumps(parsed_datas, ensure_ascii=False, indent=4)
    output_file = input_file+'.json'
    
    # write to the file
    f = open(output_file, "w", encoding="utf-8")
    f.write(json_str)
    f.close()
    print('json格式转换结束!详见%s'%(output_file))
    
    

    def create_job(tablelist):

    if os.path.exists('sqoopjob'):
        os.system('rm -fr sqoopjob/*')
        os.system('mkdir  sqoopjob/hbase-bin')
    else:
        os.mkdir('sqoopjob')
    sqoopmeta='jdbc:hsqldb:hsql://localhost:16000/sqoop'
    jdbc='jdbc:oracle:thin:@10.90.87.35:11521:bdccdb2 --username sqoopuser --password Bigdata_2016'
    sjf=open('sqoopjob/createjob.sh', "w")
    crontabf=open('./sqoopjob/crontab.cron', 'w')
    df=open('./sqoopjob/deletejob.sh', 'w')
    createtablef=open('./sqoopjob/createtable.sh', 'w')
    kerboros='kinit -kt /keytab/sqoopdba.keytab sqoopdba \n'
    
    with open(tablelist, 'r') as load_f:
        load_dict = json.load(load_f)
        for job in load_dict:
            if job['table_type'].lower()=='dim' and job['partition']=='0':
                #处理档案表
                createjob = '''sqoop  job --create {job_name} --meta-connect {sqoopmeta} \\
                        -- import --connect {jdbc} \\
                        --table {source_table} \\
                        --incremental append \\
                        --split-by {split_by} \\
                        --hbase-create-table \\
                        --hbase-table {hive_table} \\
                        --check-column  {check_column} \\
                        --last-value '2011-01-01 11:00:00' \\
                        --hbase-row-key {row_key} \\
                        --column-family cf \\
                        -m 2;\n'''.format(
                        job_name=job['job_name'].lower(),
                        sqoopmeta=sqoopmeta,
                        jdbc=jdbc,
                        source_table=job['source_table'].upper(),
                        split_by=job['split_by'].upper(),
                        hive_table=job['hive_table'].upper(),
                        check_column=job['datatime_cloumn'].upper(),
                        row_key=job['row_key']
                        ).replace('    ','')
    
                execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                        --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ','')
    
                deledrop = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                        --delete {job_name};\n'''.format(job_name=job['job_name'].lower() ).replace('    ','')
    
                createtable='''hive -e "use{hive_db};CREATE EXTERNAL TABLE {hive_table}_external(key string,{columns_hive})
                            STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
                            WITH SERDEPROPERTIES ('hbase.columns.mapping' = '{columns_hbase}')
                            TBLPROPERTIES('hbase.table.name' = '{hive_table}',
                            'hbase.mapred.output.outputtable' = '{hive_table}')";\n '''.format(
                                        hive_table=job['hive_table'],
                                        hive_db=job['hive_db'],
                                        columns_hive=' string,'.join(job['columns'].split(','))+' string',
                                        columns_hbase=':key,cf:'+',cf:'.join(job['columns'].split(','))
                                        ).replace('    ','')
                sjf.write(createjob)
                createtablef.write(createtable)
                df.write(deledrop)
                open('./sqoopjob/hbase-bin/' + job['job_name'].lower() + '.sh', 'w').write(execjob)
                crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}_hql.sh >>../logs/{job_name}_hql.out 2>&1\n'''.format(
                    job_name=job['job_name'].lower()))
                execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                        --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ','')
                open('./sqoopjob/exec_run.sh', 'a').write(execjob)
            else :pass
    
    sjf.close()
    df.close()
    #cf.close()
    print('脚本生成结束,详见./sqoopjob/*')
    

    if __name__=='__main__':

    #生成json文件
    json_make(input_file='./hbase_tablelist')
    #生成sqoop脚本
    create_job(tablelist='./hbase_tablelist.json')
    
    

关键字