You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

89 lines
2.8 KiB
JavaScript

3 years ago
/* eslint-disable no-undef */
"use strict";
const Promise = require("bluebird");
const fs = require("fs").promises;
const path = require("path");
const unreachable = require("@joepie91/unreachable")("@promistream/walk-folder");
const pipe = require("@promistream/pipe");
const simpleQueue = require("@promistream/simple-queue");
const parallelize = require("@promistream/parallelize");
const mapFilter = require("@promistream/map-filter");
const { validateArguments } = require("@validatem/core");
const required = require("@validatem/required");
const isBoolean = require("@validatem/is-boolean");
const isString = require("@validatem/is-string");
const defaultTo = require("@validatem/default-to");
// TODO: Change API to also provide access to the corresponding stat object, not just the path?
module.exports = function createWalkFolderStream(_rootPath, _options) {
let [ rootPath, options ] = validateArguments(arguments, {
rootPath: [ required, isString ],
options: [ defaultTo({}), {
linear: [ defaultTo(false), isBoolean ],
followSymlinks: [ defaultTo(true), isBoolean ],
throughStream: [ defaultTo(null) ] // FIXME: either null or isPromistream, take implementation from `pipe`
}]
});
// linear: Whether to walk the tree linearly (breadth-first) instead of randomly. This will make output deterministic, usually at the cost of being slower. On some filesystems that deal poorly with concurrent I/O, this may actually be a little faster.
// linear -> sort + no parallelize, random -> parallelize infinity unordered
let queue = simpleQueue([ rootPath ]);
let seenInodes = new Set(); // NOTE: Used for detecting and avoiding infinite loops through eg. symlinks
function queueDirectoryContents(targetPath) {
return Promise.try(() => {
return fs.readdir(targetPath);
}).then((entries) => {
if (options.linear === true) {
entries.sort(); // To make the output deterministic
}
return entries;
}).map((item) => {
return queue.push(path.join(targetPath, item));
});
}
let statStream = mapFilter((targetPath) => {
return Promise.try(() => {
if (options.followSymlinks === true) {
return fs.stat(targetPath);
} else {
return fs.lstat(targetPath);
}
}).then((stats) => {
if (stats.ino == null) {
throw new unreachable("Stats without inode encountered");
} else if (!seenInodes.has(stats.ino)) {
seenInodes.add(stats.ino);
return Promise.try(() => {
if (stats.isDirectory()) {
return queueDirectoryContents(targetPath);
}
}).then(() => {
return { path: targetPath, stats: stats };
});
} else {
return mapFilter.NoValue;
}
}).catch((error) => {
console.error(targetPath, error);
});
});
return pipe([
queue.stream,
options.throughStream,
statStream,
(options.linear !== false)
? parallelize(Infinity, { ordered: false })
: null
]);
};