多读书多实践,勤思考善领悟

大数据hadoop之 十一.组件HDFS

本文于1722天之前发表,文中内容可能已经过时。

HDFS常用Shell命令

HDFS文件操作

       HDFS是一种文件系统,专为MapReduce这类框架下的大规模分布式数据处理而设计,你可以把一个大数据集(比如说100TB)在HDFS中存储为单个文件,而大多数其他的文件系统无力实现这一点.
       HDFS并不是一个天生的UNIX文件系统,不支持像ls和cp这种标准的UNIX文件命令,也不支持如fopen()和fread()这样的标准文件读写操作.另一方面,Hadoop确也提供了一套与Linux文件命令类似的命令行工具.

基本文件fs命令

Hadoop的文件命令采取的形式为

hadoop fs -cmd <args>

基中cmd是具体的文件命令,而<args>是一组数据可变的参数.cmd的命名通常与unix对应的命令名相同.例如,文件形表命令为

1
2
3
4
5
6
7
8
9
hadoop fs -ls  

hadoop fs -mkdir /user/chuck
hadoop fs -ls /
hadoop fs -put example.txt /user/chuck
hadoop fs -ls
hadoop fs -get example.txt .
hadoop fs -cat example.txt
hadoop fs -rm example.txt

1. 显示当前目录结构

1
2
3
4
5
6
# 显示当前目录结构
hadoop fs -ls <path>
# 递归显示当前目录结构
hadoop fs -ls -R <path>
# 显示根目录下内容
hadoop fs -ls /

2. 创建目录

1
2
3
4
# 创建目录
hadoop fs -mkdir <path>
# 递归创建目录
hadoop fs -mkdir -p <path>

3. 删除操作

1
2
3
4
# 删除文件
hadoop fs -rm <path>
# 递归删除目录和文件
hadoop fs -rm -R <path>

4. 从本地加载文件到HDFS

1
2
3
# 二选一执行即可
hadoop fs -put [localsrc] [dst]
hadoop fs - copyFromLocal [localsrc] [dst]

5. 从HDFS导出文件到本地

1
2
3
# 二选一执行即可
hadoop fs -get [dst] [localsrc]
hadoop fs -copyToLocal [dst] [localsrc]

6. 查看文件内容

1
2
3
# 二选一执行即可
hadoop fs -text <path>
hadoop fs -cat <path>

7. 显示文件的最后一千字节

1
2
3
hadoop fs -tail  <path> 
# 和Linux下一样,会持续监听文件内容变化 并显示文件的最后一千字节
hadoop fs -tail -f <path>

8. 拷贝文件

1
hadoop fs -cp [src] [dst]

9. 移动文件

1
hadoop fs -mv [src] [dst]

10. 统计当前目录下各文件大小

  • 默认单位字节
  • -s : 显示所有文件大小总和,
  • -h : 将以更友好的方式显示文件大小(例如64.0m而不是67108864)
1
hadoop fs -du  <path>

11. 合并下载多个文件

  • -nl 在每个文件的末尾添加换行符(LF)
  • -skip-empty-file 跳过空文件
1
2
3
hadoop fs -getmerge
# 示例 将HDFS上的hbase-policy.xml和hbase-site.xml文件合并后下载到本地的/usr/test.xml
hadoop fs -getmerge -nl /test/hbase-policy.xml /test/hbase-site.xml /usr/test.xml

12. 统计文件系统的可用空间信息

1
hadoop fs -df -h /

13. 更改文件复制因子

1
hadoop fs -setrep [-R] [-w] <numReplicas> <path>
  • 更改文件的复制因子。如果path是目录,则更改其下所有文件的复制因子
  • -w : 请求命令是否等待复制完成
1
2
# 示例
hadoop fs -setrep -w 3 /user/hadoop/dir1

14. 权限控制

1
2
3
4
5
6
7
# 权限控制和Linux上使用方式一致
# 变更文件或目录的所属群组。 用户必须是文件的所有者或超级用户。
hadoop fs -chgrp [-R] GROUP URI [URI ...]
# 修改文件或目录的访问权限 用户必须是文件的所有者或超级用户。
hadoop fs -chmod [-R] <MODE[,MODE]... | OCTALMODE> URI [URI ...]
# 修改文件的拥有者 用户必须是超级用户。
hadoop fs -chown [-R] [OWNER][:[GROUP]] URI [URI ]

15. 文件检测

1
hadoop fs -test - [defsz]  URI

可选选项:

  • -d:如果路径是目录,返回0。
  • -e:如果路径存在,则返回0。
  • -f:如果路径是文件,则返回0。
  • -s:如果路径不为空,则返回0。
  • -r:如果路径存在且授予读权限,则返回0。
  • -w:如果路径存在且授予写入权限,则返回0。
  • -z:如果文件长度为零,则返回0。
1
2
# 示例
hadoop fs -test -e filename

cat命令

1
2
将路径指定文件的内容输出到stdout
hadoop@Master:~$ hadoop dfs -cat input/core-site.xml

chgrp命令

1
改变文件所属组.使用-R将使改变在目录结构下递归进行.命令的使用者必须是文件的所有者或者超级用户.

chmod命令

1
修改文件权限

chown命令

1
改变文件的拥有者

cp命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
将文件从源路径复制到目标路径,这个命令允许有多个源路径,此时目标路径必须是一个目录
hadoop@Master:/usr/local/hadoop/etc/hadoop$ hdfs dfs -cp input/* output/
hadoop@Master:/usr/local/hadoop/etc/hadoop$ hdfs dfs -ls output
Found 11 items
-rw-r--r-- 1 hadoop supergroup 0 2016-01-25 22:14 output/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 4436 2016-01-27 19:16 output/capacity-scheduler.xml
-rw-r--r-- 1 hadoop supergroup 1072 2016-01-27 19:16 output/core-site.xml
-rw-r--r-- 1 hadoop supergroup 9683 2016-01-27 19:16 output/hadoop-policy.xml
-rw-r--r-- 1 hadoop supergroup 1257 2016-01-27 19:16 output/hdfs-site.xml
-rw-r--r-- 1 hadoop supergroup 620 2016-01-27 19:16 output/httpfs-site.xml
-rw-r--r-- 1 hadoop supergroup 3523 2016-01-27 19:16 output/kms-acls.xml
-rw-r--r-- 1 hadoop supergroup 5511 2016-01-27 19:16 output/kms-site.xml
-rw-r--r-- 1 hadoop supergroup 1103 2016-01-27 19:16 output/mapred-site.xml
-rw-r--r-- 1 hadoop supergroup 107 2016-01-25 22:14 output/part-r-00000
-rw-r--r-- 1 hadoop supergroup 924 2016-01-27 19:16 output/yarn-site.xml

du命令

显示目录中所有文件的大小,或者当只指定一个文件时,显示此文件的大小.

1
hadoop@Master:/usr/local/hadoop/etc/hadoop$ hadoop fs -du input/core-site.xml

expunge命令

清空回收站
除了文件权限之外,还有一个保护机制可以防止在HDFS上意外删除文件,这就是回收站,默认情况下该功能是被禁用.当它启用后,用于删除的命令行不会立即删除文件.
相反它们会暂时的把文件移动到用户工作目录下的.Trash文件夹下.若要启用回收站功能并设置清空回收站的时间延迟,可能通过设置core-site.xml的fs.trash.interval属性(以分钟为单位).
例如如果你希望用户有24个小时的时间来还原已删除的文件,就应该在core-site.xml中设置.
如果将该值设置为0,则将禁用回收站的功能

1
2
3
4
<property>
<name>fs.trash.interval</name>
<value>1440</value>
</property>

get命令

复制文件到本地文件系统.

1
hadoop fs -get input/hadoop.tar.gz ~/

lsr命令

ls命令的递归版本,类似于Unix中的ls -R

###mkdir命令

接受路径指定的uri作为参数,创建这些目录,其行为类似于Unix的mkdir -p,它会创建路径中的各级父目录.

1
hadoop fs -mkdir /user/hadoop/dir1 /user/hadoop/dir2

mv命令

将文件从源路径移动到目标路径

1
hadoop fs -mv /user/hadoop/file1 /user/hadoop/file2

put命令

从本地文件系统中复制单个或者多个源路径到目标文件系统.也支持从标准输入中读取输入写入目标文件系统.

1
hadoop fs -put /tmp/*.xml /user/hadoop/

rmr命令

1
hadoop fs -rmr /user/hadoop/chu888chu888

job命令

1
2
3
4
5
6
7
8
* Job操作
* 提交MapReduce Job, Hadoop所有的MapReduce Job都是一个jar包
* $ hadoop jar <local-jar-file> <java-class> <hdfs-input-file> <hdfs-output-dir>
* $ hadoop jar sandbox-mapred-0.0.20.jar sandbox.mapred.WordCountJob /user/cl/input.dat /user/cl/outputdir
*
* 杀死某个正在运行的Job
* 假设Job_Id为:job_201207121738_0001
* $ hadoop job -kill job_201207121738_0001

系统体检

Hadoop提供的文件系统检查工具叫做fsck,如参数为文件路径时,它会递归检查该路径下所有文件的健康状态,如果参数为/,它就会检查整个文件系统,如下输出一个例子.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
hadoop@Master:~$ hadoop fsck /
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

16/01/27 22:55:14 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Connecting to namenode via http://Master:50070
FSCK started by hadoop (auth:SIMPLE) from /192.168.1.80 for path / at Wed Jan 27 22:55:15 CST 2016
.....................Status: HEALTHY
Total size: 878899 B
Total dirs: 21
Total files: 21
Total symlinks: 0
Total blocks (validated): 20 (avg. block size 43944 B)
Minimally replicated blocks: 20 (100.0 %)
Over-replicated blocks: 0 (0.0 %)
Under-replicated blocks: 0 (0.0 %)
Mis-replicated blocks: 0 (0.0 %)
Default replication factor: 1
Average block replication: 1.0
Corrupt blocks: 0
Missing replicas: 0 (0.0 %)
Number of data-nodes: 2
Number of racks: 1
FSCK ended at Wed Jan 27 22:55:15 CST 2016 in 32 milliseconds


The filesystem under path '/' is HEALTHY

HDFS Java API的使用

一、 简介

想要使用HDFS API,需要导入依赖hadoop-client。如果是CDH版本的Hadoop,还需要额外指明其仓库地址:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.heibaiying</groupId>
<artifactId>hdfs-java-api</artifactId>
<version>1.0</version>


<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>2.6.0-cdh5.15.2</hadoop.version>
</properties>


<!---配置CDH仓库地址-->
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>


<dependencies>
<!--Hadoop-client-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>

二、API的使用

2.1 FileSystem

FileSystem是所有HDFS操作的主入口。由于之后的每个单元测试都需要用到它,这里使用@Before注解进行标注。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static final String HDFS_PATH = "hdfs://192.168.0.106:8020";
private static final String HDFS_USER = "root";
private static FileSystem fileSystem;

@Before
public void prepare() {
try {
Configuration configuration = new Configuration();
// 这里我启动的是单节点的Hadoop,所以副本系数设置为1,默认值为3
configuration.set("dfs.replication", "1");
fileSystem = FileSystem.get(new URI(HDFS_PATH), configuration, HDFS_USER);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
}
}


@After
public void destroy() {
fileSystem = null;
}

2.2 创建目录

支持递归创建目录:

1
2
3
4
@Test
public void mkDir() throws Exception {
fileSystem.mkdirs(new Path("/hdfs-api/test0/"));
}

2.3 创建指定权限的目录

FsPermission(FsAction u, FsAction g, FsAction o) 的三个参数分别对应:创建者权限,同组其他用户权限,其他用户权限,权限值定义在FsAction枚举类中。

1
2
3
4
5
@Test
public void mkDirWithPermission() throws Exception {
fileSystem.mkdirs(new Path("/hdfs-api/test1/"),
new FsPermission(FsAction.READ_WRITE, FsAction.READ, FsAction.READ));
}

2.4 创建文件,并写入内容

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void create() throws Exception {
// 如果文件存在,默认会覆盖, 可以通过第二个参数进行控制。第三个参数可以控制使用缓冲区的大小
FSDataOutputStream out = fileSystem.create(new Path("/hdfs-api/test/a.txt"),
true, 4096);
out.write("hello hadoop!".getBytes());
out.write("hello spark!".getBytes());
out.write("hello flink!".getBytes());
// 强制将缓冲区中内容刷出
out.flush();
out.close();
}

2.5 判断文件是否存在

1
2
3
4
5
@Test
public void exist() throws Exception {
boolean exists = fileSystem.exists(new Path("/hdfs-api/test/a.txt"));
System.out.println(exists);
}

2.6 查看文件内容

查看小文本文件的内容,直接转换成字符串后输出:

1
2
3
4
5
6
@Test
public void readToString() throws Exception {
FSDataInputStream inputStream = fileSystem.open(new Path("/hdfs-api/test/a.txt"));
String context = inputStreamToString(inputStream, "utf-8");
System.out.println(context);
}

inputStreamToString是一个自定义方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 把输入流转换为指定编码的字符
*
* @param inputStream 输入流
* @param encode 指定编码类型
*/
private static String inputStreamToString(InputStream inputStream, String encode) {
try {
if (encode == null || ("".equals(encode))) {
encode = "utf-8";
}
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, encode));
StringBuilder builder = new StringBuilder();
String str = "";
while ((str = reader.readLine()) != null) {
builder.append(str).append("\n");
}
return builder.toString();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

2.7 文件重命名

1
2
3
4
5
6
7
@Test
public void rename() throws Exception {
Path oldPath = new Path("/hdfs-api/test/a.txt");
Path newPath = new Path("/hdfs-api/test/b.txt");
boolean result = fileSystem.rename(oldPath, newPath);
System.out.println(result);
}

2.8 删除目录或文件

1
2
3
4
5
6
7
8
9
public void delete() throws Exception {
/*
* 第二个参数代表是否递归删除
* + 如果path是一个目录且递归删除为true, 则删除该目录及其中所有文件;
* + 如果path是一个目录但递归删除为false,则会则抛出异常。
*/
boolean result = fileSystem.delete(new Path("/hdfs-api/test/b.txt"), true);
System.out.println(result);
}

2.9 上传文件到HDFS

1
2
3
4
5
6
7
@Test
public void copyFromLocalFile() throws Exception {
// 如果指定的是目录,则会把目录及其中的文件都复制到指定目录下
Path src = new Path("D:\\BigData-Notes\\notes\\installation");
Path dst = new Path("/hdfs-api/test/");
fileSystem.copyFromLocalFile(src, dst);
}

2.10 上传大文件并显示上传进度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
public void copyFromLocalBigFile() throws Exception {

File file = new File("D:\\kafka.tgz");
final float fileSize = file.length();
InputStream in = new BufferedInputStream(new FileInputStream(file));

FSDataOutputStream out = fileSystem.create(new Path("/hdfs-api/test/kafka5.tgz"),
new Progressable() {
long fileCount = 0;

public void progress() {
fileCount++;
// progress方法每上传大约64KB的数据后就会被调用一次
System.out.println("上传进度:" + (fileCount * 64 * 1024 / fileSize) * 100 + " %");
}
});

IOUtils.copyBytes(in, out, 4096);

}

2.11 从HDFS上下载文件

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void copyToLocalFile() throws Exception {
Path src = new Path("/hdfs-api/test/kafka.tgz");
Path dst = new Path("D:\\app\\");
/*
* 第一个参数控制下载完成后是否删除源文件,默认是true,即删除;
* 最后一个参数表示是否将RawLocalFileSystem用作本地文件系统;
* RawLocalFileSystem默认为false,通常情况下可以不设置,
* 但如果你在执行时候抛出NullPointerException异常,则代表你的文件系统与程序可能存在不兼容的情况(window下常见),
* 此时可以将RawLocalFileSystem设置为true
*/
fileSystem.copyToLocalFile(false, src, dst, true);
}

2.12 查看指定目录下所有文件的信息

1
2
3
4
5
6
7
public void listFiles() throws Exception {
FileStatus[] statuses = fileSystem.listStatus(new Path("/hdfs-api"));
for (FileStatus fileStatus : statuses) {
//fileStatus的toString方法被重写过,直接打印可以看到所有信息
System.out.println(fileStatus.toString());
}
}

FileStatus中包含了文件的基本信息,比如文件路径,是否是文件夹,修改时间,访问时间,所有者,所属组,文件权限,是否是符号链接等,输出内容示例如下:

1
2
3
4
5
6
7
8
9
10
FileStatus{
path=hdfs://192.168.0.106:8020/hdfs-api/test;
isDirectory=true;
modification_time=1556680796191;
access_time=0;
owner=root;
group=supergroup;
permission=rwxr-xr-x;
isSymlink=false
}

2.13 递归查看指定目录下所有文件的信息

1
2
3
4
5
6
7
@Test
public void listFilesRecursive() throws Exception {
RemoteIterator<LocatedFileStatus> files = fileSystem.listFiles(new Path("/hbase"), true);
while (files.hasNext()) {
System.out.println(files.next());
}
}

和上面输出类似,只是多了文本大小,副本系数,块大小信息。

1
2
3
4
5
6
7
8
9
10
11
LocatedFileStatus{
path=hdfs://192.168.0.106:8020/hbase/hbase.version;
isDirectory=false;
length=7;
replication=1;
blocksize=134217728;
modification_time=1554129052916;
access_time=1554902661455;
owner=root; group=supergroup;
permission=rw-r--r--;
isSymlink=false}

2.14 查看文件的块信息

1
2
3
4
5
6
7
8
9
@Test
public void getFileBlockLocations() throws Exception {

FileStatus fileStatus = fileSystem.getFileStatus(new Path("/hdfs-api/test/kafka.tgz"));
BlockLocation[] blocks = fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
for (BlockLocation block : blocks) {
System.out.println(block);
}
}

块输出信息有三个值,分别是文件的起始偏移量(offset),文件大小(length),块所在的主机名(hosts)。

1
0,57028557,hadoop001

以上所有测试用例下载地址HDFS Java API