import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;
public class HdfsUtils {
private static Configuration configuration = null;
private static final Logger logger = LoggerFactory.getLogger(HdfsUtils.class);
private static final FileSystem fs;
static {
fs = createFileSystem();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
logger.info("closing hdfs fileSystem");
fs.close();
logger.info("hdfs fileSystem closed");
} catch (IOException ioEx) {
logger.error("close failed, {}", ioEx.getMessage());
}
}
});
}
public static FileSystem getHadoopFileSystem(String user) throws URISyntaxException, IOException, InterruptedException {
Configuration conf = new Configuration();
return FileSystem.get(new URI("hdfs:///"), conf, user);
}
public static JobConf getJobConf() {
JobConf jobConf;
String confDir = "/etc/hadoop/conf/";
String coreSiteXml = confDir + "core-site.xml";
String hdfsSiteXml = confDir + "hdfs-site.xml";
if (new File(coreSiteXml).exists()
&& new File(hdfsSiteXml).exists()) {
logger.debug("loading hdfs conf from {} and {}", coreSiteXml, hdfsSiteXml);
jobConf = new JobConf(false);
jobConf.addResource(new Path(coreSiteXml));
jobConf.addResource(new Path(hdfsSiteXml));
} else {
logger.debug("loading hdfs conf from local jar");
Configuration localConf = new Configuration();
localConf.addResource("core-site.xml");
localConf.addResource("hdfs-site.xml");
jobConf = new JobConf(localConf, HdfsUtils.class);
}
jobConf.setJobName("HDFS");
return jobConf;
}
private static FileSystem createFileSystem() {
JobConf jobConf = getJobConf();
URI uri = FileSystem.getDefaultUri(jobConf);
FileSystem fileSystem = null;
try {
fileSystem = FileSystem.get(uri, jobConf);
} catch (IOException e) {
logger.error("get hdfs file system failed.", e);
}
return fileSystem;
}
public static void merge(String targetDir, String outputFile) {
rm(outputFile);
try (FSDataOutputStream os = fs.create(new Path(outputFile))) {
FileStatus[] list = ls(targetDir);
for (FileStatus file : list) {
if (file.isFile()) {
try (FSDataInputStream fsdis = fs.open(file.getPath())) {
IOUtils.copyLarge(fsdis, os);
}
}
}
rm(targetDir);
} catch (IOException e) {
logger.error("merge faild,targetDir:" + targetDir + ",outputFile:" + outputFile, e);
rm(outputFile);
}
}
public static boolean mkdirs(String folder) {
try {
Path path = new Path(folder);
if (!fs.exists(path)) {
boolean result = fs.mkdirs(path);
logger.debug("mkdirs: " + folder + ",result:" + result);
return result;
}
} catch (IOException e) {
logger.error("mkdirs faild,folder:" + folder, e);
}
return false;
}
public static FileStatus[] ls(String folder, PathFilter pathFilter) {
try {
FileStatus[] list;
if (pathFilter == null) {
list = fs.listStatus(new Path(folder));
} else {
list = fs.listStatus(new Path(folder), pathFilter);
}
if (logger.isTraceEnabled()) {
logger.trace("ls: " + folder + ",item size:" + list.length);
logger.trace("==========================================================");
for (FileStatus f : list) {
logger.trace("name: " + f.getPath() + ", folder: " + f.isDirectory() + ", size: " + f.getLen());
}
logger.trace("==========================================================");
}
return list;
} catch (IOException e) {
logger.error("ls faild,folder:" + folder, e);
}
return null;
}
public static FileStatus[] ls(String folder) {
return ls(folder, null);
}
public static String[] fileStatus2PathStr(FileStatus... fsArr) {
if (fsArr.length == 0) {
return null;
}
String[] pathArr = new String[fsArr.length];
for (int i = 0; i < fsArr.length; i++) {
pathArr[i] = fsArr[i].getPath().toString();
}
return pathArr;
}
public static List<String> listFiles(String... folder) {
try {
List<String> fileList = new ArrayList<>();
for (String s : folder) {
RemoteIterator<LocatedFileStatus> fileIterator = fs.listFiles(new Path(s), true);
while (fileIterator.hasNext()) {
fileList.add(fileIterator.next().getPath().toString());
}
}
return fileList;
} catch (IOException e) {
logger.error("listFiles faild", e);
return null;
}
}
public static boolean createFile(String file, String content) {
try (FSDataOutputStream os = fs.create(new Path(file))) {
IOUtils.write(content, os, "UTF-8");
logger.debug("Create file: " + file);
return true;
} catch (IOException e) {
logger.error("createFile faild,file:" + file, e);
}
return false;
}
public static boolean mv(String src, String dst) {
try {
boolean result = fs.rename(new Path(src), new Path(dst));
logger.debug("mv from " + src + " to " + dst + ",result:" + result);
return result;
} catch (IOException e) {
logger.error("mv faild,src:" + src + ",dst:" + dst, e);
}
return false;
}
public static boolean rm(String folder) {
try {
boolean result = fs.delete(new Path(folder), true);
logger.debug("rm : " + folder + ",result:" + result);
return result;
} catch (IOException e) {
logger.error("rm faild:" + folder, e);
}
return false;
}
public static boolean copyFromLocalFileAndDelSrc(String local, String remote) {
return copyFromLocalFile(true, local, remote);
}
public static boolean copyFromLocalFile(boolean delSrc, String local, String remote) {
try {
fs.copyFromLocalFile(delSrc, true, new Path(local), new Path(remote));
logger.debug("copyFromLocalFile from: " + local + " to " + remote);
return true;
} catch (IOException e) {
logger.error("copyFromLocalFile faild,from:" + local + ",to:" + remote, e);
}
return false;
}
public static void download(String remote, String local) {
try {
fs.copyToLocalFile(new Path(remote), new Path(local));
logger.debug("download: from" + remote + " to " + local);
} catch (IOException e) {
logger.error("download faild,from" + remote + " to " + local, e);
}
}
public static void writeToOS(String remote, OutputStream os) {
writeToOS(remote, os, false);
}
public static void writeToOS(String remote, OutputStream os, boolean closeOutputStream) {
try (FSDataInputStream is = fs.open(new Path(remote))) {
IOUtils.copy(is, os);
logger.debug("writeToOS: from" + remote);
} catch (IOException e) {
logger.error("writeToOS faild,from" + remote, e);
}
if (closeOutputStream) {
IOUtils.closeQuietly(os);
}
}
public static void downloadAndDelCRC(String remote, String local) {
download(remote, local);
String crcFile = FilenameUtils.getFullPath(local) + "." + FilenameUtils.getName(local) + ".crc";
FileUtils.deleteQuietly(new File(crcFile));
}
public static String cat(String remoteFile) {
return cat(remoteFile, "UTF-8");
}
public static String cat(String remoteFile, String encoding) {
try (FSDataInputStream fsdis = fs.open(new Path(remoteFile))) {
String content = IOUtils.toString(fsdis, encoding);
logger.debug("cat: " + remoteFile);
return content;
} catch (IOException e) {
logger.error("cat faild,remoteFile:" + remoteFile, e);
}
return null;
}
public static boolean isFile(String f) {
try {
boolean result = fs.isFile(new Path(f));
logger.debug("isFile: " + result);
return result;
} catch (IOException e) {
logger.error("isFile faild:" + f, e);
}
return false;
}
public static boolean isDirectory(String f) {
try {
boolean result = fs.isDirectory(new Path(f));
logger.debug("isDirectory: " + result);
return result;
} catch (IOException e) {
logger.error("isDirectory faild:" + f, e);
}
return false;
}
public static boolean exists(String path) {
try {
boolean result = false;
Path p = new Path(path);
if (path.contains("*") || path.contains("[")) {
FileStatus[] fileStatuses = fs.globStatus(p);
for (FileStatus curPath : fileStatuses) {
if (curPath.isFile()) {
result = true;
break;
} else {
FileStatus[] subFiles = ls(curPath.getPath().toString());
if (subFiles != null && subFiles.length > 0) {
result = true;
break;
}
}
}
} else {
result = fs.exists(p);
}
logger.debug("exists: " + result);
return result;
} catch (IOException e) {
logger.error("exists method failed: " + path, e);
}
return false;
}
public static String findLatest(String folder) {
try {
List<FileStatus> list = Arrays.asList(fs.listStatus(new Path(folder)));
if (list.size() == 0) {
return null;
} else {
Collections.sort(list, (o1, o2) -> {
return -Long.compare(o1.getModificationTime(), o2.getModificationTime());
});
return list.get(0).getPath().toString();
}
} catch (IOException e) {
logger.error("findLatest faild,folder:" + folder, e);
}
return null;
}
public static String[] listDirectories(final String dir) {
try {
return listDirectories(dir, true);
} catch (IOException e) {
return null;
}
}
public static String[] listDirectories(final String dir, final boolean recursive) throws IOException {
RemoteIterator<LocatedFileStatus> remoteIterator = new RemoteIterator<LocatedFileStatus>() {
private final Stack<RemoteIterator<LocatedFileStatus>> itors = new Stack<>();
private RemoteIterator<LocatedFileStatus> curItor = fs.listLocatedStatus(new Path(dir));
private LocatedFileStatus curDir;
@Override
public boolean hasNext() throws IOException {
while (curDir == null) {
if (curItor.hasNext()) {
handleFileStat(curItor.next());
} else if (!itors.empty()) {
curItor = itors.pop();
} else {
return false;
}
}
return true;
}
private void handleFileStat(LocatedFileStatus stat) throws IOException {
if (stat.isDirectory()) {
curDir = stat;
if (recursive) {
itors.push(curItor);
curItor = fs.listLocatedStatus(stat.getPath());
}
}
}
@Override
public LocatedFileStatus next() throws IOException {
if (hasNext()) {
LocatedFileStatus result = curDir;
curDir = null;
return result;
}
throw new NoSuchElementException("No more entry in " + dir);
}
};
List<String> dirList = new ArrayList<>();
while (remoteIterator.hasNext()) {
dirList.add(remoteIterator.next().getPath().toString());
}
String[] dirArr = dirList.toArray(new String[0]);
return dirArr;
}
public static OutputStream getOutputStream(String file) {
try {
return fs.create(new Path(file));
} catch (IOException e) {
return null;
}
}
public static InputStream getInputStream(String file) {
try {
return fs.open(new Path(file));
} catch (IOException e) {
return null;
}
}
public static boolean cp(final String src, final String dst) {
boolean result = false;
try {
result = FileUtil.copy(fs, new Path(src), fs, new Path(dst), false, getJobConf());
logger.debug("cp from {} to {},result:{}", src, dst, result);
} catch (IOException e) {
if (logger.isErrorEnabled()) {
logger.error("cp faild,src:" + src + ",dst:" + dst, e);
}
}
return result;
}
public static List<FileStatus> getFileStatus(String path) {
final Path hdfsPath = new Path(path);
List<FileStatus> list = null;
try {
final FileStatus[] fileStatuses = fs.globStatus(hdfsPath);
list = Arrays.asList(fileStatuses);
} catch (IOException e) {
e.printStackTrace();
}
return list;
}
public static void rename(String path, String newFileName) throws IOException {
init();
FileSystem fileSystem = FileSystem.get(configuration);
try {
RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(new Path(path), true);
LocatedFileStatus fileStatus = iterator.next();
Path oldPath = fileStatus.getPath();
if (fileStatus.isFile()) {
Path newPath = new Path(oldPath.getParent() + "/" + newFileName);
fileSystem.rename(oldPath, newPath);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static void renameFile(String oldFile, String newFile) throws IOException {
init();
FileSystem fileSystem = FileSystem.get(configuration);
try {
fileSystem.rename(new Path(oldFile), new Path(newFile));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static String fileName(String path) throws IOException {
Path path1 = new Path(path);
init();
FileSystem fileSystem = FileSystem.get(configuration);
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(path1, true);
LocatedFileStatus next = locatedFileStatusRemoteIterator.next();
Path path2 = next.getPath();
return path2.getName();
}
private static void init() {
if (configuration == null) {
configuration = new Configuration();
}
}
public static void deleteFile(String path) throws IOException {
init();
FileSystem fileSystem = FileSystem.get(configuration);
fileSystem.delete(new Path(path), true);
}
public static Boolean isExist(String path) throws IOException {
init();
FileSystem fileSystem = FileSystem.get(configuration);
return fileSystem.exists(new Path(path));
}
public static int fileNum(String inPath, JavaSparkContext sc) throws Exception {
Path path = new Path(inPath);
FileSystem fs = FileSystem.get(sc.hadoopConfiguration());
if (fs.exists(path)) {
return Integer.parseInt(String.valueOf(fs.getContentSummary(path).getFileCount()));
}
return 0;
}
public static long hdfsFolderCapacity(String inPath, JavaSparkContext sc) throws Exception {
List<String> list = new ArrayList<>();
Path path = new Path(inPath);
FileSystem fs = FileSystem.get(sc.hadoopConfiguration());
if (fs.exists(path)) {
return fs.getContentSummary(path).getLength();
}
return 0L;
}
}