一、 实验目的
1)理解 HDFS 在 Hadoop 体系结构中的角色。
2)熟练使用 HDFS 操作常用的 shell 命令。
3)熟悉 HDFS 操作常用的 Java API。
二、 实验平台
1)操作系统:Linux(Ubuntu18.04);
2)Hadoop 版本:2.9.0;
3)JDK 版本:1.8;
4)Java IDE:Eclipse 3.8。
三、 实验内容
编程实现以下功能,并利用 Hadoop 提供的 Shell 命令完成相同任务:
1)向 HDFS 中上传任意文本文件,如果指定的文件在 HDFS 中已经存在,则由用户来指定是 追加到原有文件末尾还是覆盖原有的文件;
package lab1;
import java.io.*;
import java.util.*;
import java.net.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
public class test1 {
public static void test1(FileSystem hdfs, Path srcPath, Path desPath) {
try {
if (hdfs.exists(desPath)) {
System.out.println(desPath + "已存在");
System.out.println("覆盖原有文件(y)还是追加到原有文件末尾(n)?(y/n)");
if (new Scanner(System.in).next().equals("y")) {
hdfs.copyFromLocalFile(srcPath, desPath);
System.out.println("已覆盖");
} else {
FileInputStream inputStream = new FileInputStream(srcPath.toString());
FSDataOutputStream out = hdfs.append(desPath);
byte[] bytes = new byte[1024];
int read = -1;
while ((read = inputStream.read(bytes)) > 0) {
out.write(bytes, 0, read);
}
inputStream.close();
out.close();
System.out.println("已追加");
}
} else {
hdfs.copyFromLocalFile(srcPath, desPath);
System.out.println("已写入");
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
FileSystem fs = FileSystem.get(conf);
Path path1 = new Path("./src/lab1/test.txt");
Path path2 = new Path("hdfs://localhost:9000/test/test.txt");
test1(fs, path1, path2);
}
}
2)从 HDFS 中下载指定文件,如果本地文件与要下载的文件名称相同,则自动对下载的文件 重命名;
package lab1;
import java.io.*;
import java.util.*;
import java.net.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
public class test2 {
private static void test2(FileSystem hdfs, Path remotePath, Path localPath) {
try {
if (hdfs.exists(remotePath)) {
hdfs.copyToLocalFile(remotePath, localPath);
System.out.println("已下载");
} else {
System.out.println("HDFS中没有此文件");
}
} catch (FileAlreadyExistsException e) {
System.out.println("已存在");
try {
System.out.println(localPath.toString());
hdfs.copyToLocalFile(remotePath, new Path("./src/lab1/test2_" + new Random().nextInt() + ".txt"));
} catch (IOException e1) {
e1.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
FileSystem fs = FileSystem.get(conf);
Path path1 = new Path("hdfs://localhost:9000/test/test.txt");
Path path2 = new Path("./src/lab1/test2.txt");
test2(fs, path1, path2);
}
}
3)将 HDFS 中指定文件的内容输出到终端中;
package lab1;
import java.io.*;
import java.util.*;
import java.net.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
public class test3 {
public static void test3(FileSystem hdfs, Path srcPath) {
try {
if (!hdfs.exists(srcPath)) {
System.out.println(srcPath + "不存在");
} else {
FSDataInputStream in = hdfs.open(srcPath);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in));
String line;
while ((line = bufferedReader.readLine()) != null) {
System.out.println(line);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
FileSystem fs = FileSystem.get(conf);
Path path = new Path("hdfs://localhost:9000/test/test.txt");
test3(fs, path);
}
}
4)显示 HDFS 中指定的文件的读写权限、大小、创建时间、路径等信息;
package lab1;
import java.io.*;
import java.util.*;
import java.net.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import java.text.SimpleDateFormat;//用于格式化日期
public class test4 {
public static void test4(FileSystem hdfs, Path srcPath) {
try {
FileStatus[] fileStatus = hdfs.listStatus(srcPath);
for (FileStatus status : fileStatus) {
System.out.println("读写权限:" + status.getPermission().toString());
System.out.println("大小:" + status.getLen());
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("创建时间:" + format.format(status.getModificationTime()));
System.out.println("路径:" + status.getPath());
}
} catch (FileNotFoundException e) {
System.out.println(srcPath + "不存在");
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
FileSystem fs = FileSystem.get(conf);
Path path = new Path("hdfs://localhost:9000/test/test.txt");
test4(fs, path);
}
}
5)给定 HDFS 中某一个目录,递归输出该目录下的所有文件的读写权限、大小、创建时间、 路径等信息;
package lab1;
import java.io.*;
import java.util.*;
import java.net.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import java.text.SimpleDateFormat;//用于格式化日期
public class test5 {
public static void test5(FileSystem hdfs, Path srcPath) {
RemoteIterator<LocatedFileStatus> iterator;
try {
iterator = hdfs.listFiles(srcPath, true);
while (iterator.hasNext()) {
FileStatus status = iterator.next();
System.out.println("读写权限:" + status.getPermission().toString());
System.out.println("大小:" + status.getLen());
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("创建时间:" + format.format(status.getModificationTime()));
System.out.println("路径:" + status.getPath());
System.out.println();
}
} catch (FileNotFoundException e) {
System.out.println(srcPath + "不存在");
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
FileSystem fs = FileSystem.get(conf);
Path path = new Path("hdfs://localhost:9000/ex1");
test5(fs, path);
}
}
6)提供一个 HDFS 内的文件的路径,对该文件进行创建和删除操作。如果文件所在目录不存在,则自动创建目录;
package lab1;
import java.io.*;
import java.util.*;
import java.net.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
public class test6 {
public static void test6(FileSystem hdfs, Path filePath) {
String filePath_s = filePath.toString();
String dirPath_s = filePath_s.substring(0, filePath_s.lastIndexOf('/'));
Path dirPath = new Path(dirPath_s);
try {
System.out.println("创建文件(y)还是删除文件(n)?(y/n)");
if (new Scanner(System.in).next().equals("y")) {
if (!hdfs.exists(dirPath)) {
System.out.println(dirPath + "不存在");
hdfs.mkdirs(dirPath);
System.out.println(dirPath + "已创建");
}
hdfs.create(filePath);
System.out.println(filePath + "已创建");
} else {
if (!hdfs.exists(filePath)) {
System.out.println(filePath + "不存在");
} else {
hdfs.delete(filePath, true);
System.out.println(filePath + "已删除");
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
FileSystem fs = FileSystem.get(conf);
Path path = new Path("hdfs://localhost:9000/test6/cre.txt");
test6(fs, path);
}
}
7)提供一个 HDFS 的目录的路径,对该目录进行创建和删除操作。创建目录时,如果目录文件所在目录不存在,则自动创建相应目录;删除目录时,当该目录为空时删除,当该目录不为空时不删除该目录;
package lab1;
import java.io.*;
import java.util.*;
import java.net.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
public class test7 {
public static void test7(FileSystem hdfs, Path dirPath) {
try {
System.out.println("创建目录(y)还是删除目录(n)?(y/n)");
if (new Scanner(System.in).next().equals("y")) {
if (hdfs.exists(dirPath)) {
System.out.println(dirPath + "已存在");
} else {
System.out.println(dirPath + "不存在");
hdfs.mkdirs(dirPath);
System.out.println(dirPath + "已创建");
}
} else {
if (!hdfs.exists(dirPath)) {
System.out.println(dirPath + "不存在");
} else {
FileStatus[] fileStatus = hdfs.listStatus(dirPath);
if (fileStatus.length == 0) {
hdfs.delete(dirPath, true);
System.out.println(dirPath + "为空,已删除");
} else {
System.out.println(dirPath + "非空,故不删除");
for (FileStatus status : fileStatus) {
System.out.println(status.getPath());
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
FileSystem fs = FileSystem.get(conf);
Path path = new Path("hdfs://localhost:9000/test");
test7(fs, path);
}
}
8)向 HDFS 中指定的文件追加内容,由用户指定内容追加到原有文件的开头或结尾;
package lab1;
import java.io.*;
import java.util.*;
import java.net.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
public class test8 {
private static void test8(FileSystem hdfs, Path remotePath, Path localPath) {
try {
if (!hdfs.exists(remotePath)) {
System.out.println("文件不存在");
return;
}
System.out.println("追加到开头(y)还是追加到结尾(n)?(y/n)");
if (new Scanner(System.in).next().equals("y")) {
Path path_tmp = new Path("./src/lab1/tmp.txt");
hdfs.moveToLocalFile(remotePath, path_tmp);
FSDataOutputStream fsDataOutputStream = hdfs.create(remotePath);
FileInputStream fileInputStream = new FileInputStream(localPath.toString());
FileInputStream fileInputStream1 = new FileInputStream("./src/lab1/tmp.txt");
byte[] bytes = new byte[1024];
int read = -1;
while ((read = fileInputStream.read(bytes)) > 0) {
fsDataOutputStream.write(bytes, 0, read);
}
while ((read = fileInputStream1.read(bytes)) > 0) {
fsDataOutputStream.write(bytes, 0, read);
}
fileInputStream.close();
fileInputStream1.close();
fsDataOutputStream.close();
System.out.println("已追加到开头");
} else {
FileInputStream inputStream = new FileInputStream(localPath.toString());
FSDataOutputStream outputStream = hdfs.append(remotePath);
byte[] bytes1 = new byte[1024];
int read1 = -1;
while ((read1 = inputStream.read(bytes1)) > 0) {
outputStream.write(bytes1, 0, read1);
}
inputStream.close();
outputStream.close();
System.out.println("已追加到结尾");
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
FileSystem fs = FileSystem.get(conf);
Path path1 = new Path("hdfs://localhost:9000/test/test.txt");
Path path2 = new Path("./src/lab1/test2.txt");
test8(fs, path1, path2);
}
}
9)删除 HDFS 中指定的文件;
package lab1;
import java.io.*;
import java.util.*;
import java.net.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
public class test9 {
private static void test9(FileSystem hdfs, Path remotePath) {
try {
if (hdfs.delete(remotePath, true)) {
System.out.println("删除成功");
} else {
System.out.println("删除失败");
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
FileSystem fs = FileSystem.get(conf);
Path path = new Path("hdfs://localhost:9000/test/aa.txt");
test9(fs, path);
}
}
10)在 HDFS 中,将文件从源路径移动到目的路径。
package lab1;
import java.io.*;
import java.util.*;
import java.net.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
public class test10 {
private static void test10(FileSystem hdfs, Path oldRemotePath, Path newRemotePath) {
try {
if (hdfs.rename(oldRemotePath, newRemotePath)) {
System.out.println("移动成功");
} else {
System.out.println("移动失败");
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
FileSystem fs = FileSystem.get(conf);
Path path1 = new Path("hdfs://localhost:9000/test/a.txt");
Path path2 = new Path("hdfs://localhost:9000/test/test2/a.txt");
test10(fs, path1, path2);
}
}