应企业的要求本人一直在研究 大数据技术 ,市面上对于JAVA操作 HDFS 的文档非常之多,在此本人奉上 Python 操作HDFS的代码接口与片段,希望能帮助到某些有此需要的小伙伴们。如有问题请指正,愿同朋友们一并学习,谢谢!废话不多说,干货奉上。
Python操作HDFS数据
直接使用pip 或pycharm的解释器安装(pyhdfs即可)
1. 创建hadoop hdfs集群链接
fs = pyhdfs.HdfsClient(hosts=["hdp88.bigdata.zzyc.cn:50070","hdp87.bigdata.zzyc.cn:50070"],user_name="work",randomize_hosts=True,timeout=20,max_tries=2, retry_delay=5)
参数解释:
hosts:主机名 IP地址与port号之间需要用":"隔开 如:hosts="hdp88.bigdata.zzyc.cn,50070" 多个主机时可以传入list, 如:["hdp88.bigdata.zzyc.cn:50070","hdp87.bigdata.zzyc.cn:50070"]
randomize_hosts:随机选择host进行连接,默认为True
user_name:连接的Hadoop平台的用户名
timeout:每个 namenode 节点连接等待的秒数,默认20sec
max_tries:每个Namenode节点尝试连接的次数,默认2次
retry_delay:在尝试连接一个Namenode节点失败后,尝试连接下一个Namenode的时间间隔,默认5sec
requests_session:连接HDFS的HTTP request请求使用的session,默认为None
2.返回当前用户的根目录
home_directory = fs.get_home_directory()
3.返回可用的namenode
active_namenode=get_active_namenode()
4.遍历目录
fs.listdir(path) path: 可以传入想遍历的目录路径
5.遍历目录查找文件
fs.walk(path) 使用方法同os.walk(path) for root,dir,files in fs.walk(path): pass
6.读取文件内容,返回文件对象
f = fs. open ("/sparklearn.txt") 操作方式如本地文件python操作方式, f. close ,f.data,f.read,f.readline,f.readlines,f.seek
7.上传文件至集群
fs.copy_from_local(src,dst,**kwargs) 例如:fs.copy_from_local("/etc/hosts","/usr/work/hosts") **kwargs 参数可省略
8. 下载集群中文件至本地
fs.copy_to_local("/user/work/ test .txt","/test.txt")
9.向HDFS集群中已存在的文件中追加内容
append (path, data, **kwargs)
fs.append("/user/work/hosts","wxzhostname 10.10.01.01.0")
校验:
f = fs.open("/user/work/hosts")
f.read()
10. 在集群中创建文件
create(path, data, **kwargs) fs.create("/user/work/test.txt","teststststststst\ntttt") 创建文件的同时,需要传入数据
11.合并集群中的多个文件
concat(target, sources, **kwargs) 例如: fs.concat("/user/work/hosts",["/user/work/sparklearn.txt"]) 校验: f = fs.open("/user/work/hosts") f.read()
12.创建目录
mkdirs(path, **kwargs) flag = fs.mkdirs("/user/testdir") 创建成功结果返回True 验证: fs.listdir("/user/")
13. 判断文件是否存在
exists(path, **kwargs) 返回结果: 存在:True 不存在:False
14.获取目录的概览
get_content_summary(path, **kwargs) spaceQuota(int) - 磁盘空间配额。 fileCount(int) - 文件数。 quota(int) - 此目录的命名空间配额。 directoryCount(int) - 目录数。 spaceConsumed(int) - 内容占用的磁盘空间。 length(int) - 内容使用的字节数。 例如: fs.get_content_summary("/user/work") 返回结果:ContentSummary(spaceQuota=-1, length=667827198, directoryCount=16, spaceConsumed=2003481594, quota=-1, fileCount=311)
15.查看文件的校验和
get_file_checksum(path, **kwargs)
例如:
datas =fs.get_file_checksum("/user/work/hosts")
可根据返回字段获取相关的值:
返回结果:
FileChecksum(length=28, bytes =u'00000200000000000000000217aff1b1eabaaa542ef2b824847e6f1000000000', algorithm=u'MD5-of-2MD5-of-512CRC32C')
获取文件大小,MD5值等可直接对象.name的方式获取。
如:datas.length、datas.algorithm、datas.bytes
16. 查看目录或文件的状态
list_status(path, **kwargs) 如: fs.list_status("/user/work/hosts") 返回结果: [FileStatus(group=u'supergroup', permission=u'755', blockSize=268435456, accessTime=1546930708585, pathSuffix=u'', modificationTime=1546931364082, replication=3, length=594, childrenNum=0, owner=u'work', storagePolicy=0, type=u'FILE', fileId=18787)]