pyhdfs模块是Python提供的第三方库模块,它提供了直接对Hadoop中HDFS操作的能力,pyhdfs模块是HDFS的API和命令行接口。
1.安装PyHDFS库
在使用pyhdfs模块需要安装PyHDFS,在Python中所有的第三方模块均采用pip安装。
在Windows下使用pip安装模块有以下两种方式。
(1)命令行安装方式:运行→cmd→pip install pyhdfs,如图1所示。
图1 命令行安装方式安装pyhdfs模块
PyCharm安装方式:在Terminal界面输入pip install pyhdfs命令,如图2所示。
图2 PyCharm安装方式安装pyhdfs模块
验证PyHDFS是否安装成功:在控制台终端输入pip list,如果查看到安装的pyhdfs模块则说明安装成功,如图3所示。
图3 查看安装结果
2.连接HDFS
pyhdfs模块中的HdfsClient类非常关键。使用这个类可以实现连接HDFS的NameNode,对HDFS上的文件进行查、读、写操作等功能。
参数解析如下。
hosts:主机名或IP地址,与port之间需要逗号隔开,如:hosts="192.168.153.101:9000",支持高可用集群,例如:["192.168.153.101,9000","192.168.153.102,9000"]。
randomize_hosts:随机选择host进行连接,默认为True。
user_name:连接的Hadoop平台的用户名。
timeout:每个NameNode节点连接等待的秒数,默认为20sec。
max_tries:每个NameNode节点尝试连接的次数,默认为2次。
retry_delay:在尝试连接一个NameNode节点失败后,尝试连接下一个NameNode的时间间隔,默认为5s。
requests_session:设置连接HDFS的HTTPRequest请求的模式为session。
代码示例如下:
import pyhdfs
class HDFSTest2:
def __init__(self):
self.client = pyhdfs.HdfsClient(hosts="192.168.153.101,50070", user_name="hadoop")
def test(self):
print(self.client)
if __name__ == '__main__':
h = HDFSTest2()
h.test()
结果如图4所示。
图4 输出结果
注意,在Windons下使用PyCharm中pyhdfs模块连接HDFS时,需要设置Windons中的hosts文件,否则将无法在连接集群里根据节点名字找到从节点,如图5所示。
图5 设置hosts文件
3.get_home_directory()函数
get_home_directory()函数用于返回所连接集群的根目录。
get_home_directory()函数示例如下:
class HDFSTest2:
# 获取对HDFS操作的对象
def __init__(self):
self.client = pyhdfs.HdfsClient(hosts="192.168.153.101, 50070", user_name="hadoop")
# 返回这个用户的根目录
def get_home_directory(self):
c = self.client
print(c.get_home_directory())
if __name__ == '__main__':
h = HDFSTest2()
h.get_home_directory()
结果如图6所示。
图6 get_home_directory()函数示例结果
4.get_active_namenode()函数
get_active_namenode()函数用于返回当前活动的NameNode的地址。
get_active_namenode()函数示例如下:
class HDFSTest2:
# 获取对HDFS操作的对象
def __init__(self):
self.client = pyhdfs.HdfsClient(hosts="192.168.153.101,50070", user_name="hadoop")
# 返回可用的NameNode节点
def get_active_namenode(self):
c = self.client
nameNode = c.get_active_namenode()
print(nameNode)
if __name__ == '__main__':
h = HDFSTest2()
h.get_active_namenode()
结果如图7所示。
图7 get_active_namenode()函数示例结果
5.listdir()函数
listdir()函数用于返回指定目录下的所有文件,由于PyHDFS可以设置访问的用户,因此在操作HDFS中的文件时不需要设置其他用户的权限。listdir函数中path参数是指定的HDFS路径。
listdir()函数示例如下:
class HDFSTest2:
# 获取对HDFS操作的对象
def __init__(self):
self.client = pyhdfs.HdfsClient(hosts="192.168.153.101,50070", user_name="hadoop")
# 查询HDFS中根目录下的所有文件
def listdir(self, hdfsPath):
c = self.client
dir = c.listdir(path=hdfsPath)
for d in dir:
print(d, end=" ")
if __name__ == '__main__':
h = HDFSTest2()
h.listdir("/")
结果如图8所示。
图8 listdir()函数示例结果
6.open()函数
open()函数用于远程打开HDFS中的文件,返回IO[bytes]类型,利用read()函数读取指定文件的数据返回AnyStr类型。
open()函数示例如下:
class HDFSTest2:
# 获取对HDFS操作的对象
def __init__(self):
self.client = pyhdfs.HdfsClient(hosts="192.168.153.101,50070", user_name="hadoop")
# 打开hdfs中文件
def open(self, filDir):
c = self.client
file = c.open(path=filDir)
print(file.read().decode(encoding="utf8"))
if __name__ == '__main__':
h = HDFSTest2()
h.open("/input/data")
结果如图9所示。
图9 open()函数示例结果
7.copy_from_local()函数
copy_from_local()函数用于从本地上传文件到集群,接收两个参数:localsrc参数用于设置本地文件路径;dest用于设置HDFS中文件路径,如果dest参数对应的路径不存在将创建一个新路径。
copy_from_local()函数示例如下:
class HDFSTest2:
# 获取对HDFS操作的对象
def __init__(self):
self.client = pyhdfs.HdfsClient(hosts="192.168.153.101,50070", user_name="hadoop")
# 从本地上传文件至集群
def copy_from_local(self, local, hdfsPath):
c = self.client
c.copy_from_local(localsrc=local, dest=hdfsPath)
if __name__ == '__main__':
h = HDFSTest2()
h.copy_from_local("D:/tmp/test.txt", "/input/dd/newTest")
输出结果:
[hadoop@Slave003 ~]$ hadoop fs -ls /input
-rw-r--r-- 3 hadoop supergroup 77 2020-07-23 04:20 /input/data
[hadoop@Slave003 ~]$ hadoop fs -ls /input
-rw-r--r-- 3 hadoop supergroup 77 2020-07-23 04:20 /input/data
-rwxr-xr-x 3 hadoop supergroup 10 2020-07-24 06:12 /input/newTest
[hadoop@Slave003 ~]$ hadoop fs -ls -R /input
-rw-r--r-- 3 hadoop supergroup 77 2020-07-23 04:20 /input/data
drwxr-xr-x - hadoop supergroup 0 2020-07-24 06:14 /input/dd
-rwxr-xr-x 3 hadoop supergroup 10 2020-07-24 06:14 /input/dd/newTest
-rwxr-xr-x 3 hadoop supergroup 10 2020-07-24 06:12 /input/newTest
8.copy_to_local()函数
copy_to_local()函数用于从集群的HDFS中下载文件到本地,接收两个参数:src参数为hdfs中的文件路径;localdest参数为本地的文件存储路径。
copy_to_local()函数示例如下:
class HDFSTest2:
# 获取对HDFS操作的对象
def __init__(self):
self.client = pyhdfs.HdfsClient(hosts="192.168.153.101,50070", user_name="hadoop")
# 从集群下载文件到本地
def copy_to_local(self, hdfsPath, local):
c = self.client
c.copy_to_local(src=hdfsPath, localdest=local)
if __name__ == '__main__':
h = HDFSTest2()
h.copy_to_local("/input/data", "D:/tmp")
9.mkdirs()函数
mkdirs()函数用于在集群的HDFS中创建新目录,path参数用于传入需要创建的路径。
mkdirs()函数示例如下:
class HDFSTest2:
# 获取对HDFS操作的对象
def __init__(self):
self.client = pyhdfs.HdfsClient(hosts="192.168.153.101,50070", user_name="hadoop")
# 创建新目录
def mkdirs(self, hdfsPath):
c = self.client
c.mkdirs(path=hdfsPath)
if __name__ == '__main__':
h = HDFSTest2()
h.mkdirs("/input/tmp")
结果如图10所示。
图10 mkdirs()函数示例结果
10.exists()函数
exists()函数用于查看指定的文件或目录是否存在,如果存在则返回True,否则返回False。
exists()函数示例如下:
class HDFSTest2:
# 获取对HDFS操作的对象
def __init__(self):
self.client = pyhdfs.HdfsClient(hosts="192.168.153.101,50070", user_name="hadoop")
# 查看文件是否存在
def exists(self, hdfsPath):
c = self.client
result = c.exists(path=hdfsPath)
print("结果:", result)
if __name__ == '__main__':
h = HDFSTest2()
h.exists("/input")
结果如图11所示。
图11 exists()函数示例结果
11.get_file_status()函数
get_file_status()函数用于返回指定HDFS路径的路径对象,path参数为HDFS文件或目录路径。
get_file_status函数示例如下:
class HDFSTest2:
# 获取对HDFS操作的对象
def __init__(self):
self.client = pyhdfs.HdfsClient(hosts="192.168.153.101,50070", user_name="hadoop")
def get_file_status(self, hdfsPath):
c = self.client
status = c.get_file_status(hdfsPath)
print(status["type"])
if status["type"] == "DIRECTORY":
print(f"{hdfsPath}:该文件是目录!")
elif status["type"] == "FILE":
print(f"{hdfsPath}:该文件是文件!")
if __name__ == '__main__':
h = HDFSTest2()
h.get_file_status("/input/data")
结果如图12所示。
图12 运行结果
12.delete()函数
delete()函数用于删除HDFS文件函数,该函数只能删除文件或者空目录,如果删除的目录下有文件的目录,将抛出HdfsPathIsNotEmptyDirectoryException异常。
delete()函数示例如下:
class HDFSTest2:
# 获取对HDFS操作的对象
def __init__(self):
self.client = pyhdfs.HdfsClient(hosts="192.168.153.101,50070", user_name="hadoop")
def delete(self):
c = self.client
c.delete("/input")
if __name__ == '__main__':
h = HDFSTest2()
h.delet