您的位置 首页 java

Spring Boot与Hadoop HDFS分布式文件系统

Spring Boot与Hadoop HDFS分布式文件系统

一、HDFS概念

HDFS属于Hadoop大数据生态圈,最早是由谷歌GFS论文中引申出来的概念,全名叫做Hadoop Distributed File System。也就是分布式文件系统,按区域进行存储数据。

Spring Boot与Hadoop HDFS分布式文件系统

1.1HDFS适用场景

主要特点是高容错性、大文件存储、高吞吐量。

Spring Boot与Hadoop HDFS分布式文件系统

1.2HDFS缺点

由于HDFS是由高数据吞吐量优化的,会造成低时间数据访问延迟;

会造成用户量多任意修改文件;

会造成非常多的小文件,磁盘空间任意消耗。

Spring Boot与Hadoop HDFS分布式文件系统

1.3HDFS组成部分

NameNode:是一个元数据,支持存储、生成文件系统;

Client:支持客户端得到数据回传给业务;

DataNode:是HDFS中用于存储实际数据的部分,上报数据给NameNode,支持多个实例。

Spring Boot与Hadoop HDFS分布式文件系统

1.4HDFS支持的数据访问格式

可支持WebHDFS和HTTPFS,本文将以Spring Boot操作HTTPFS来说明Spring Boot操作HDFS形式。

Spring Boot与Hadoop HDFS分布式文件系统

二、HDFS 支持Java API主要工具类

[A]用于获取服务器和客户端配置信息的Configuration;

[B]用于生成获取文件系统的FileSystem;

[C]HDFS中的数据输入流,用于FileSystem的open方法的FSDataInputStream;

[D]HDFS的数据输出流,由FileSystem Create方法生成的FSDataOutputStream。

Spring Boot与Hadoop HDFS分布式文件系统

三、开启Spring Boot项目支持HDFS

首先引入Spring Boot工程的父模块依赖,组成Spring Boot工程:

<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>2.1.6.RELEASE</version>

<relativePath />

</parent>

引入Spring Boot组件:

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-web</artifactId>

</dependency>

引入Hadoop工具包:

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-common</artifactId>

<version>3.1.1</version>

</dependency>

引入Hadoop HDFS支撑组件:

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-hdfs</artifactId>

<version>3.1.1</version>

</dependency>

引入Hadoop客户端工具:

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-client</artifactId>

<version>3.1.1</version>

</dependency>

引入MR支持组件:

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-mapreduce-client-core</artifactId>

<version>3.1.1</version>

</dependency>

构建Maven工程:

<build>

<plugins>

<plugin>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-maven-plugin</artifactId>

</plugin>

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-compiler-plugin</artifactId>

<configuration>

<source>1.8</source>

<target>1.8</target>

</configuration>

</plugin>

</plugins>

</build>

主要启动文件bootstrap.yml配置:

hadoop.name-node: hdfs://192.168.1.82:9000

hadoop.namespace: /myora

# hadoop:

# fs-uri: hdfs://192.168.1.82:9000

Spring Boot与Hadoop HDFS分布式文件系统

3.1编写HDFS工具类

3.1.1获取文件系统

public class HdfsHutools {

/**

* 获取文件系统

* @return

*/

public static FileSystem getFileSystem(String hdfsClientUri) {

//读取配置文件

Configuration conf = new Configuration();

// 文件系统

FileSystem fs = null;

if(StringUtils.isBlank(hdfsClientUri)){

// 返回默认文件系统 如果在 Hadoop集群下运行,使用此种方法可直接获取默认文件系统

try {

fs = FileSystem.get(conf);

} catch (Exception e) {

System.out.print(“异常”+ e.toString());

}

}else{

// 返回指定的文件系统,如果在本地测试,需要使用此种方法获取文件系统

try {

URI uri = new URI(hdfsClientUri.trim());

fs = FileSystem.get(uri,conf);

} catch (Exception e) {

System.out.print(“异常”+ e.toString());

}

}

return fs;

}

Spring Boot与Hadoop HDFS分布式文件系统

3.1.2创建HDFS文件目录

public static void mkdir(String hdfsCreateUri, String path) {

try {

// 获取HDFS文件系统

FileSystem fs = getFileSystem(hdfsCreateUri);

if(StringUtils.isNotBlank(hdfsCreateUri)){

path = hdfsCreateUri+ path;

}

// 创建目录

fs.mkdirs(new Path(path));

//释放资源

fs.close();

} catch (Exception e) {

System.out.print(“异常”+ e.toString());

}

}

Spring Boot与Hadoop HDFS分布式文件系统

3.1.3删除HDFS文件目录

public static void rmdir(String hdfsCreateUri,String path) {

try {

// 返回FileSystem对象

FileSystem fs = getFileSystem(hdfsCreateUri);

if(StringUtils.isNotBlank(hdfsCreateUri)){

path = hdfsCreateUri+ path;

}

// 删除文件或者文件目录 delete(Path f) 此方法已经弃用

fs.delete(new Path(path),true);

// 释放资源

fs.close();

} catch (Exception e) {

System.out.print(“异常”+ e.toString());

}

}

Spring Boot与Hadoop HDFS分布式文件系统

3.1.4过滤掉HDFS目录下的文件

public static String[] listFile(String hdfsCreateUri, String path,PathFilter pathFilter) {

String[] files = new String[0];

try {

// 返回FileSystem对象

FileSystem fs = getFileSystem(hdfsCreateUri);

if(StringUtils.isNotBlank(hdfsCreateUri)){

path = hdfsCreateUri+ path;

}

FileStatus[] status;

if(pathFilter != null){

// 根据filter列出目录内容

status = fs.listStatus(new Path(path),pathFilter);

}else{

// 列出目录内容

status = fs.listStatus(new Path(path));

}

// 获取目录下的所有文件路径

Path[] listedPaths = FileUtil.stat2Paths(status);

// 转换String[]

if (listedPaths != null && listedPaths.length > 0){

files = new String[listedPaths.length];

for (int i = 0; i < files.length; i++){

files[i] = listedPaths[i].toString();

}

}

// 释放资源

fs.close();

} catch (Exception e) {

System.out.print(“异常”+ e.toString());

}

return files;

}

Spring Boot与Hadoop HDFS分布式文件系统

3.1.5上传文件至HDFS

public static void copyFileToHDFS(String hdfsCreateUri,boolean delSrc, boolean overwrite,String srcFile,String destPath) {

Path srcPath = new Path(srcFile);

// 目的路径

if(StringUtils.isNotBlank(hdfsCreateUri)){

destPath = hdfsCreateUri+ destPath;

}

Path dstPath = new Path(destPath);

// 实现文件上传

try {

// 获取FileSystem对象

FileSystem fs = getFileSystem(hdfsCreateUri);

fs.copyFromLocalFile(srcPath, dstPath);

fs.copyFromLocalFile(delSrc,overwrite,srcPath, dstPath);

//释放资源

fs.close();

} catch (Exception e) {

System.out.print(“异常”+ e.toString());

}

}

Spring Boot与Hadoop HDFS分布式文件系统

3.1.6从HDFS下载文件

public static void getFile(String hdfsCreateUri, String srcFile,String destPath) {

// 源文件路径

if(StringUtils.isNotBlank(hdfsCreateUri)){

srcFile = hdfsCreateUri+ srcFile;

}

Path srcPath = new Path(srcFile);

Path dstPath = new Path(destPath);

try {

// 获取FileSystem对象

FileSystem fs = getFileSystem(hdfsCreateUri);

// 下载hdfs上的文件

fs.copyToLocalFile(srcPath, dstPath);

// 释放资源

fs.close();

} catch (Exception e) {

System.out.print(“异常”+ e.toString());

}

}

Spring Boot与Hadoop HDFS分布式文件系统

3.1.7获取HDFS集群节点信息

public static DatanodeInfo[] getHDFSNodes(String hdfsCreateUri) {

// 获取所有节点

DatanodeInfo[] dataNodeStats = new DatanodeInfo[0];

try {

// 返回FileSystem对象

FileSystem fs = getFileSystem(hdfsCreateUri);

// 获取分布式文件系统

DistributedFileSystem hdfs = (DistributedFileSystem)fs;

dataNodeStats = hdfs.getDataNodeStats();

} catch (Exception e) {

System.out.print(“异常”+ e.toString());

}

return dataNodeStats;

}

Spring Boot与Hadoop HDFS分布式文件系统

3.1.8获取文件位置

public static BlockLocation[] getFileBlockLocations(String hdfsCreateUri, String filePath) {

// 文件路径

if(StringUtils.isNotBlank(hdfsCreateUri)){

filePath = hdfsCreateUri+ filePath;

}

Path path = new Path(filePath);

// 文件块位置列表

BlockLocation[] blkLocations = new BlockLocation[0];

try {

// 返回FileSystem对象

FileSystem fs = getFileSystem(hdfsCreateUri);

// 获取文件目录

FileStatus filestatus = fs.getFileStatus(path);

//获取文件块位置列表

blkLocations = fs.getFileBlockLocations(filestatus, 0, filestatus.getLen());

} catch (Exception e) {

System.out.print(“异常”+ e.toString());

}

return blkLocations;

}

Spring Boot与Hadoop HDFS分布式文件系统

3.1.9判断HDFS目录是否存在

public boolean existDir(String hdfsCreateUri,String filePath, boolean create){

boolean flag = false;

if (StringUtils.isEmpty(filePath)){

return flag;

}

try{

Path path = new Path(filePath);

// FileSystem对象

FileSystem fs = getFileSystem(hdfsCreateUri);

if (create){

if (!fs.exists(path)){

fs.mkdirs(path);

}

}

if (fs.isDirectory(path)){

flag = true;

}

}catch (Exception e){

System.out.print(“异常”+ e.toString());

}

return flag;

}

}

Spring Boot与Hadoop HDFS分布式文件系统

3.2HDFS的Spring Boot配置

@Configuration

@ConditionalOnProperty(name=”hadoop.name-node”)

@Slf4j

public class HDFSClientConfig {

@Value(“${hadoop.name-node}”)

private String nameNode;

@Bean(“fileSystem”)

public FileSystem createHDFSFile() {

//读取HDFS Config

Configuration conf = new Configuration();

conf.set(“fs.defalutFS”, nameNode);

conf.set(“dfs.replication”, “1”);

FileSystem fs = null;

//conf.set(“fs.defaultFS”,”hdfs://ns1″);

//指定访问HDFS的client信息

//fs = FileSystem.get(new URI(nameNode), conf, “root”);

// 文件HDFS系统

try {

URI uri = new URI(nameNode.trim());

fs = FileSystem.get(uri,conf,”root”);

} catch (Exception e) {

System.out.println(e.toString());

}

System.out.println(“fs.defaultFS: “+conf.get(“fs.defaultFS”));

return fs;

}

Spring Boot与Hadoop HDFS分布式文件系统

3.3Restful格式Controller代码示例

删除HDFS文件:

@GetMapping(“/delete”)

public String delete(@RequestParam String fileName){

hadoopUtil.rmdir(fileName,””);

return “delete”;

}

下载HDFS文件:

@GetMapping(“/downloadToDest”)

public String downloadToDest(@RequestParam String fileName,@RequestParam String savePath){

hadoopUtil.getFile(fileName,””,savePath);

return “downloadToDest”;

}

Spring Boot与Hadoop HDFS分布式文件系统

四、Spring Boot后续

Spring Boot本人水平也有限,如有不对,还望指正。

文章来源:智云一二三科技

文章标题:Spring Boot与Hadoop HDFS分布式文件系统

文章地址:https://www.zhihuclub.com/185810.shtml

关于作者: 智云科技

热门文章

发表回复

您的电子邮箱地址不会被公开。

网站地图