Как перечислить все файлы в каталоге и его подкаталогах в hadoop hdfs

У меня есть папка в hdfs, у которой есть две подпапки, каждая из которых содержит около 30 подпапок, которые, наконец, содержат хml файлы. Я хочу перечислить все xml файлы, содержащие только основной путь к папке. Локально я могу сделать это с помощью apache commons-io's FileUtils.listFiles(). Я пробовал это

FileStatus[] status = fs.listStatus( new Path( args[ 0 ] ) );

но он отображает только две первые подпапки и не идет дальше. Есть ли способ сделать это в hadoop?

8 ответов

Вам нужно будет использовать объект FileSystem и выполнить некоторую логику для результирующих объектов FileStatus для ручной рекурсии в подкаталоги.

Вы также можете применить PathFilter только для возврата файлов xml с помощью метода listStatus (Path, PathFilter)

В классе hadoop FsShell есть примеры этого для команды hadoop fs -lsr, которая является рекурсивным ls - см. источник, вокруг строки 590 (рекурсивный шаг запускается по строке 635)


Если вы используете hasoop 2. * API, есть более элегантные решения:

Configuration conf = getConf();
 Job job = Job.getInstance(conf);
 FileSystem fs = FileSystem.get(conf);
 //the second boolean parameter here sets the recursion to true
 RemoteIterator<locatedfilestatus> fileStatusListIterator = fs.listFiles(
 new Path("path/to/lib"), true);
 while(fileStatusListIterator.hasNext()){
 LocatedFileStatus fileStatus = fileStatusListIterator.next();
 //do stuff with the file like ...
 job.addFileToClassPath(fileStatus.getPath());
 }
</locatedfilestatus>


Вы пробовали это:

import java.io.*;
import java.util.*;
import java.net.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class cat{
 public static void main (String [] args) throws Exception{
 try{
 FileSystem fs = FileSystem.get(new Configuration());
 FileStatus[] status = fs.listStatus(new Path("hdfs://test.com:9000/user/test/in")); // you need to pass in your hdfs path
 for (int i=0;i


/**
 * @param filePath
 * @param fs
 * @return list of absolute file path present in given path
 * @throws FileNotFoundException
 * @throws IOException
 */
public static List<string> getAllFilePath(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
 List<string> fileList = new ArrayList<string>();
 FileStatus[] fileStatus = fs.listStatus(filePath);
 for (FileStatus fileStat : fileStatus) {
 if (fileStat.isDirectory()) {
 fileList.addAll(getAllFilePath(fileStat.getPath(), fs));
 } else {
 fileList.add(fileStat.getPath().toString());
 }
 }
 return fileList;
}
</string></string></string>

Быстрый пример: предположим, что у вас есть следующая структура файла:

a -> b
 -> c -> d
 -> e 
 -> d -> f

Используя приведенный выше код, вы получаете:

a/b
a/c/d
a/c/e
a/d/f

Если вам нужен только лист (например, имена файлов), используйте следующий код в блоке else:

...
 } else {
 String fileName = fileStat.getPath().toString(); 
 fileList.add(fileName.substring(fileName.lastIndexOf("/") + 1));
 }

Это даст:

b
d
e
f


Вот фрагмент кода, который подсчитывает количество файлов в определенном каталоге HDFS (я использовал это, чтобы определить, сколько редукторов использовать в конкретном коде ETL). Вы можете легко изменить это, чтобы удовлетворить ваши потребности.

private int calculateNumberOfReducers(String input) throws IOException {
 int numberOfReducers = 0;
 Path inputPath = new Path(input);
 FileSystem fs = inputPath.getFileSystem(getConf());
 FileStatus[] statuses = fs.globStatus(inputPath);
 for(FileStatus status: statuses) {
 if(status.isDirectory()) {
 numberOfReducers += getNumberOfInputFiles(status, fs);
 } else if(status.isFile()) {
 numberOfReducers ++;
 }
 }
 return numberOfReducers;
}
/**
 * Recursively determines number of input files in an HDFS directory
 *
 * @param status instance of FileStatus
 * @param fs instance of FileSystem
 * @return number of input files within particular HDFS directory
 * @throws IOException
 */
private int getNumberOfInputFiles(FileStatus status, FileSystem fs) throws IOException {
 int inputFileCount = 0;
 if(status.isDirectory()) {
 FileStatus[] files = fs.listStatus(status.getPath());
 for(FileStatus file: files) {
 inputFileCount += getNumberOfInputFiles(file, fs);
 }
 } else {
 inputFileCount ++;
 }
 return inputFileCount;
}


Теперь Spark может использовать то же самое и свой путь быстрее, чем другие подходы (например, Hadoop MR). Вот фрагмент кода.

def traverseDirectory(filePath:String,recursiveTraverse:Boolean,filePaths:ListBuffer[String]) {
 val files = FileSystem.get( sparkContext.hadoopConfiguration ).listStatus(new Path(filePath))
 files.foreach { fileStatus => {
 if(!fileStatus.isDirectory() && fileStatus.getPath().getName().endsWith(".xml")) { 
 filePaths+=fileStatus.getPath().toString() 
 }
 else if(fileStatus.isDirectory()) {
 traverseDirectory(fileStatus.getPath().toString(), recursiveTraverse, filePaths)
 }
 }
 } 
}


Спасибо Radu Adrian Moldovan за это предложение.

Вот реализация с использованием очереди:

private static List<string> listAllFilePath(Path hdfsFilePath, FileSystem fs)
throws FileNotFoundException, IOException {
 List<string> filePathList = new ArrayList<string>();
 Queue<path> fileQueue = new LinkedList<path>();
 fileQueue.add(hdfsFilePath);
 while (!fileQueue.isEmpty()) {
 Path filePath = fileQueue.remove();
 if (fs.isFile(filePath)) {
 filePathList.add(filePath.toString());
 } else {
 FileStatus[] fileStatus = fs.listStatus(filePath);
 for (FileStatus fileStat : fileStatus) {
 fileQueue.add(fileStat.getPath());
 }
 }
 }
 return filePathList;
}
</path></path></string></string></string>


не использовать рекурсивный подход (проблемы с кучей):) использовать очередь

queue.add(param_dir)
while (queue is not empty){
 directory= queue.pop
 - get items from current directory
 - if item is file add to a list (final list)
 - if item is directory => queue.push
}

Это было легко, наслаждайтесь!

licensed under cc by-sa 3.0 with attribution.