Python3 操作 HDFS

发布时间:2019-09-28 08:39:10编辑:auto阅读(1881)

    【第三方包】


    【功能】

    • 重命名 hdfs 文件或目录

    # encoding: utf-8
    # author: walker
    # date: 2018-03-17 
    # summary: 利用 pyhdfs 重命名 hdfs 文件或目录
    
    import os, sys, time
    from pyhdfs import HdfsClient
    
    SrcPath = '/test/xxx'
    DstPath = '/test/yyy'
    NameNode = 'nn1.example.com:50070,nn2.example.com:50070'
    
    
    # 将 SrcPath 改名为 DstPath
    def Rename(SrcPath, DstPath):
    	fs = HdfsClient(hosts=NameNode)
    	if not fs.exists(SrcPath):
    		print('Error: not found %s' % SrcPath)
    		sys.exit(-1)
    		
    	print('Reanme ... \n%s\n -> \n%s \n' % (SrcPath, DstPath))
    	
    	fs.rename(SrcPath, DstPath)
    	
    	
    if __name__ == '__main__':
    	Rename(SrcPath, DstPath)
    • 上传文件

    # encoding: utf-8
    # author: walker
    # date: 2018-01-23
    # summary: 上传本地文件到 hdfs 目录
    
    import os, sys, time
    from pyhdfs import HdfsClient
    from configparser import ConfigParser
    
    cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))
    StartTime = time.time()
    FileSize = 0		#文件总大小
    
    LocalDir = ''
    HdfsDir = ''
    NameNode = ''
    UserName = ''
    
    #读取配置文件	
    def ReadConfig():
    	global LocalDir, HdfsDir, NameNode, UserName
    
    	cfg = ConfigParser()
    	cfgFile = os.path.join(cur_dir_fullpath, 'config.ini')
    	if not os.path.exists(cfgFile):
    		input(cfgFile + ' not found')
    		sys.exit(-1)
    	cfgLst = cfg.read(cfgFile)
    	if len(cfgLst) < 1:
    		input('Read config.ini failed...')
    		sys.exit(-1)
    
    	
    	LocalDir = cfg.get('config', 'LocalDir').strip()   
    	if not os.path.exists(LocalDir):
    		input(LocalDir + ' not found')
    		sys.exit(-1)
    	print('LocalDir:' + LocalDir)
    	
    	HdfsDir = cfg.get('config', 'HdfsDir').strip() 
    	print('HdfsDir:' + HdfsDir)	
    	
    	NameNode = cfg.get('config', 'NameNode').strip() 
    	print('NameNode:' + NameNode)	
    
    	UserName = cfg.get('config', 'UserName').strip() 
    	print('UserName:' + UserName)	
    	
    	print('Read config.ini successed!')
    	
    #处理一个
    def ProcOne(client, srcFile, dstFile):
    	global FileSize
    	print('ProcOne \n%s\n -> \n%s ' % (srcFile, dstFile))
    	
    	#目标文件已经存在且大小相同
    	if client.exists(dstFile) and \
    		(os.path.getsize(srcFile) == client.list_status(dstFile)[0].length):
    		print('file exists: %s ' % dstFile)
    		return True
    	
    	#注意,如果已存在会被覆盖
    	client.copy_from_local(srcFile, dstFile, overwrite=True)	
    	
    	#校验文件大小
    	if os.path.getsize(srcFile) == client.list_status(dstFile)[0].length:	
    		FileSize += os.path.getsize(srcFile)
    		return True
    		
    	return False
    	
    #处理所有
    def ProcAll():	
    	client = HdfsClient(hosts=NameNode, user_name=UserName)
    	if not client.exists(HdfsDir):
    		print(HdfsDir + ' not found')
    		sys.exit(-1)	
    	total = len(os.listdir(LocalDir))
    	processed = 0
    	failedList = list()
    	for filename in os.listdir(LocalDir):
    		srcFile = os.path.join(LocalDir, filename)
    		dstFile = HdfsDir + '/' + filename
    		if not ProcOne(client, srcFile, dstFile):
    			failedList.append(srcFile)
    		processed += 1		
    		print('%d/%d/%d, time cost: %.2f s' % (total, processed, len(failedList), time.time()-StartTime))
    		print('%d B, %.2f MB/s \n' % (FileSize, FileSize/1024/1024/(time.time()-StartTime)))
    	
    	if failedList:
    		print('failedList: %s' % repr(failedList))
    	else:
    		print('Good! No Error!')
    		print('%d B, %.2f MB, %.2f GB, %.2f MB/s' % \
                (FileSize, FileSize/1024/1024, FileSize/1024/1024/1024, FileSize/1024/1024/(time.time()-StartTime)))
    	
    if __name__ == '__main__':
    	ReadConfig()
    	ProcAll()
    	print('Time total: %.2f s' % (time.time()-StartTime))
    	print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()))
    • 下载 HDFS 文件到本地

    # encoding: utf-8
    # author: walker
    # date: 2018-06-07
    # summary: 下载 HDFS 文件(或目录)到本地
    
    import os, sys, time
    from pyhdfs import HdfsClient
    from configparser import ConfigParser
    
    cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))
    StartTime = time.time()
    FileSize = 0        #文件总大小
    
    LocalDir = ''
    HdfsDir = ''
    NameNode = ''
    UserName = ''
    
    #读取配置文件 
    def ReadConfig():
        global LocalDir, HdfsDir, NameNode, UserName
    
        cfg = ConfigParser()
        cfgFile = os.path.join(cur_dir_fullpath, 'config.ini')
        if not os.path.exists(cfgFile):
            input(cfgFile + ' not found')
            sys.exit(-1)
        cfgLst = cfg.read(cfgFile)
        if len(cfgLst) < 1:
            input('Read config.ini failed...')
            sys.exit(-1)
        
        LocalDir = cfg.get('config', 'LocalDir').strip()
        if not os.path.exists(LocalDir):
            input(LocalDir + ' not found')
            sys.exit(-1)
        print('LocalDir:' + LocalDir)
        
        HdfsDir = cfg.get('config', 'HdfsDir').strip().rstrip('/')
        print('HdfsDir:' + HdfsDir) 
        
        NameNode = cfg.get('config', 'NameNode').strip()
        print('NameNode:' + NameNode)   
    
        UserName = cfg.get('config', 'UserName').strip()
        print('UserName:' + UserName)   
        
        print('Read config.ini successed!')
        
    #处理一个
    def ProcOne(client, srcFile, dstFile):
        global FileSize
        print('ProcOne \n%s\n -> \n%s ' % (srcFile, dstFile))
    
        dstDir = os.path.dirname(dstFile)
        if not os.path.exists(dstDir):
            os.makedirs(dstDir)
        
        # 目标文件已经存在且大小相同
        if os.path.exists(dstFile) and \
            (os.path.getsize(dstFile) == client.list_status(srcFile)[0].length):
            print('file exists: %s ' % dstFile)
            return True
        
        # 注意,如果已存在会被覆盖
        client.copy_to_local(srcFile, dstFile, overwrite=True)
        
        if os.path.getsize(dstFile) != client.list_status(srcFile)[0].length:   #校验文件大小
            return False
    
        FileSize += os.path.getsize(dstFile)
        return True
        
    #处理所有
    def ProcAll():  
        client = HdfsClient(hosts=NameNode, user_name=UserName)
        if not client.exists(HdfsDir):
            print(HdfsDir + ' not found')
            sys.exit(-1)    
            
        total = 0
        # 先遍历一遍,得到总文件个数
        for parent, dirnames, filenames in client.walk(HdfsDir):
            for filename in filenames:
                total += 1
        processed = 0
        failedList = list()
    
        for parent, dirnames, filenames in client.walk(HdfsDir):
            for filename in filenames:
                srcFile = '%s/%s' % (parent, filename)
                relPath = srcFile[len(HdfsDir)+1:].replace('/', '\\')   # 相对于根目录的路径
                dstFile = os.path.join(LocalDir, relPath)
                if not ProcOne(client, srcFile, dstFile):
                    failedList.append(srcFile)
                processed += 1      
                print('%d/%d/%d, time cost: %.2f s' % (total, processed, len(failedList), time.time()-StartTime))
                print('%d B, %.2f MB/s \n' % (FileSize, FileSize/1024/1024/(time.time()-StartTime)))
        
        if failedList:
            print('failedList: %s' % repr(failedList))
        else:
            print('Good! No Error!')
            print('%d B, %.2f MB, %.2f GB, %.2f MB/s' % \
    (FileSize, FileSize/1024/1024, FileSize/1024/1024/1024, FileSize/1024/1024/(time.time()-StartTime)))
        
    if __name__ == '__main__':
        ReadConfig()
        ProcAll()
        print('Time total: %.2f s' % (time.time()-StartTime))
        print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()))


    *** walker ***




关键字