Initial commit

master
Sven Slootweg 5 years ago
commit b27e87c3e1

2
.gitignore vendored

@ -0,0 +1,2 @@
yarn.lock
node_modules

@ -0,0 +1,8 @@
"use strict";
module.exports = {
createScraper: require("./scraper"),
mergeUrl: require("./merge-url"),
partialData: require("./partial-data"),
hasPartialData: require("./scraper/item-has-partial-data")
};

@ -0,0 +1,15 @@
"use strict";
const url = require("url");
module.exports = function mergeUrl(baseUrl, newProps) {
let parsedBaseUrl = url.parse(baseUrl);
let fullNewProps = Object.assign(newProps);
if (fullNewProps.query != null) {
fullNewProps.search = null;
}
return url.format(Object.assign({}, parsedBaseUrl, fullNewProps));
};

@ -0,0 +1,17 @@
{
"name": "scrappie",
"version": "0.1.0",
"main": "index.js",
"repository": "http://git.cryto.net/joepie91/node-scrappie.git",
"author": "Sven Slootweg <admin@cryto.net>",
"license": "WTFPL OR CC0-1.0",
"dependencies": {
"assure-array": "^1.0.0",
"bluebird": "^3.5.4",
"chalk": "^2.4.2",
"create-error": "^0.3.1",
"database-error": "^2.0.1",
"default-value": "^1.0.0",
"pad": "^3.0.1"
}
}

@ -0,0 +1,8 @@
"use strict";
module.exports = function markPartialData(data) {
return {
__tag: "partialData",
__contents: data
};
};

@ -0,0 +1,28 @@
"use strict";
const Promise = require("bluebird");
/* A bit like Promise.each, including being sequential, but at any point the processing callback can invoke a 'dispose' function to throw away the rest of the queue and complete iteration early. Useful when processing of a specific item can invalidate the items that come after it. */
module.exports = function runDisposableQueue(items, processingCallback) {
let itemIndex = 0; // FIXME: overflow
function processItem() {
let disposeFlag = false;
return Promise.try(() => {
let item = items[itemIndex];
itemIndex += 1;
return processingCallback(item, () => {
disposeFlag = true;
});
}).then(() => {
if (disposeFlag === false && itemIndex < items.length) {
return processItem();
}
});
}
return processItem();
};

@ -0,0 +1,7 @@
"use strict";
const createError = require("create-error");
module.exports = {
NoHandler: createError("NoHandler")
};

@ -0,0 +1,13 @@
"use strict";
module.exports = function findScraperHandler(scraper, targetUrl) {
let result = scraper.handlers.find(([regex, _handler]) => regex.exec(targetUrl));
if (result != null) {
let [_regex, _type, handler] = result;
return handler;
} else {
throw new errors.NoHandler(`Scraper does not have a handler for URL in queue: ${updatedUrl}`);
}
};

@ -0,0 +1,172 @@
"use strict";
const Promise = require("bluebird");
const databaseError = require("database-error");
const assureArray = require("assure-array");
const defaultValue = require("default-value");
const createScrapeLogger = require("./scrape-logger");
const normalizeItems = require("./normalize-items");
const normalizeUrls = require("./normalize-urls");
const runDisposableQueue = require("./disposable-queue");
const errors = require("./errors");
const itemHasPartialData = require("./item-has-partial-data");
const findScraperHandler = require("./find-scraper-handler");
module.exports = function createScraper({ backend, scraper }) {
if (backend == null || scraper == null) {
throw new Error("Must specify both a backend and a scraper");
} else {
let logger = createScrapeLogger({ name: scraper.name, itemFormatter: backend.formatItem });
function addRootUrl({ url, description, metadata }) {
return backend.addUrl({ url, description, metadata, depth: 0 });
}
function getRootOrBatch({ amount, rootUrls }) {
return Promise.try(() => {
return backend.getBatch({ amount });
}).then((results) => {
if (results.length > 0) {
return results;
} else {
return Promise.try(() => {
return Promise.map(assureArray(rootUrls), (rootUrl) => {
return addRootUrl({
url: rootUrl.url,
description: rootUrl.description
});
});
}).then(() => {
return backend.getBatch({ amount });
});
}
}).catch(databaseError.rethrow);
}
function restoreState(url) {
/* NOTE: This will re-fetch every ancestor of the specified URL, from the least-depth to the most-depth ancestor. This will gradually reconstruct any (session) state needed to correctly fetch the URL in question, and continue processing the queue. This only needs to be done for the very first queued URL after crash recovery, since from that point on the state will be identical to before the crash. */
return Promise.try(() => {
return backend.getAncestorsForUrl({ urlId: url.id });
}).then((ancestors) => {
ancestors.reverse();
return Promise.each(ancestors, (ancestorUrl) => {
/* NOTE: We ignore the returned values here. */
logger.debug(`Loading URL for session recovery: ${ancestorUrl.url}`)
return runItem(ancestorUrl);
});
});
}
function runItem(parentUrl) {
return Promise.try(() => {
let updatedUrl = scraper.updateUrl(parentUrl.url);
let handler = findScraperHandler(scraper, updatedUrl);
return Promise.try(() => {
// logger.debug(`Scraping: ${updatedUrl}`);
if (parentUrl.description != null) {
logger.info(`Scraping: ${parentUrl.description}`);
logger.debug(`URL: ${updatedUrl}`)
} else {
logger.info(`Scraping: ${updatedUrl}`);
}
return handler(updatedUrl, {
description: parentUrl.description,
metadata: defaultValue(parentUrl.metadata, {})
});
}).then((result) => {
let normalizedUrls = normalizeUrls(result.urls, { parentUrl: parentUrl });
let normalizedItems = normalizeItems(result.items);
return {
urls: normalizedUrls,
items: normalizedItems
};
});
});
}
function runBatch(batch) {
return Promise.try(() => {
return runDisposableQueue(batch, (parentUrl, disposeQueue) => {
return Promise.try(() => {
return runItem(parentUrl);
}).then(({urls, items}) => {
if (urls.some((url) => url.depth > parentUrl.depth)) {
/* Since we always want to scrape depth-first, we always want our processing queue to only contain the highest-depth items that exist in the backing store at that time. Therefore, if we encounter any new to-be-queued URLs that are going to have a higher depth than the current item, we immediately throw away the entirety of the queue, triggering a re-fetch from the backing store for a new batch at the new maximum depth. */
logger.debug(`Disposing URL queue`);
disposeQueue();
}
return Promise.all([
Promise.map(urls, (url) => {
if (url.description != null) {
logger.debug(`Queueing new URL: ${url.description} (${url.url})`);
} else {
logger.debug(`Queueing new URL: ${url.url}`);
}
return backend.addUrl({
parentUrlId: parentUrl.id,
depth: url.depth,
url: url.url,
description: url.description,
metadata: url.metadata
});
}),
Promise.map(items, (item) => {
logger.done(item);
logger.debug(`Encountered item`, item);
return backend.storeItem({
parentUrlId: parentUrl.id,
data: item,
isPartial: itemHasPartialData(item)
});
})
]);
}).then(() => {
return backend.markUrlDone({ urlId: parentUrl.id });
}).catch(errors.NoHandler, (err) => {
logger.error(err.message);
});
});
}).then(() => {
return backend.getBatch({
amount: 20
});
}).then((nextBatch) => {
if (nextBatch.length > 0) {
return runBatch(nextBatch);
}
});
}
return function startScraper() {
return Promise.try(() => {
return scraper.initialize();
}).then((rootUrls) => {
return getRootOrBatch({
rootUrls: normalizeUrls(rootUrls),
amount: 20
});
}).then((firstBatch) => {
if (firstBatch.length > 0) {
return Promise.try(() => {
if (firstBatch[0].depth > 0) {
/* We only need to do this for URLs other than the root URLs. */
return restoreState(firstBatch[0]);
}
}).then(() => {
return runBatch(firstBatch);
});
} else {
throw new Error("No URLs in queue after scraper initialization");
}
});
}
}
};

@ -0,0 +1,8 @@
"use strict";
module.exports = function itemHasPartialData(item) {
return Object.keys(item.tags).some((property) => {
let propertyTags = item.tags[property];
return propertyTags.includes("partialData");
});
};

@ -0,0 +1,41 @@
"use strict";
const assureArray = require("assure-array");
function normalizeTags(item) {
let tags = { _item: [] };
let data = {};
let canonicalItem = item;
while (canonicalItem.__tag != null) {
tags._item.push(canonicalItem.__tag);
canonicalItem = canonicalItem.__content;
}
for (let property of Object.keys(canonicalItem)) {
let value = canonicalItem[property];
tags[property] = [];
while (value != null && value.__tag != null) {
tags[property].push(value.__tag);
value = value.__contents;
}
data[property] = value;
}
return {
tags: tags,
data: data
};
}
module.exports = function normalizeItems(items) {
if (items == null) {
return [];
} else {
return assureArray(items).map((item) => normalizeTags(item));
}
};

@ -0,0 +1,31 @@
"use strict";
const assureArray = require("assure-array");
function objectifyUrl(url) {
if (typeof url === "string") {
return { url: url };
} else {
return url;
}
}
module.exports = function normalizeUrls(urls, {parentUrl} = {}) {
if (urls == null) {
return [];
} else {
return assureArray(urls).map((url) => {
let urlObject = objectifyUrl(url);
if (parentUrl != null) {
urlObject.depth = (urlObject.sameDepth)
? parentUrl.depth
: parentUrl.depth + 1;
urlObject.parentUrlId = parentUrl.id;
}
return urlObject;
});
}
};

@ -0,0 +1,64 @@
"use strict";
const chalk = require("chalk");
const util = require("util");
const pad = require("pad");
function concatenateArguments(args) {
let stringified = args.map((arg) => {
if (typeof arg === "string") {
return arg;
} else {
return util.inspect(arg, {depth: null, colors: false});
}
});
return stringified.join(" ");
}
let minimumLabelWidth = 9;
function padLabel(label) {
/* The template string below is to ensure that the label is always passed in as a string, otherwise the `pad` library will get confused. */
return pad(`${label}`, minimumLabelWidth);
}
module.exports = function createScrapeLogger({ name, itemFormatter }) {
return {
log: function log(message) {
console.log(`${chalk.bold.gray(`[${name}]`)} ${message}`);
},
logWithLabel: function logWithLabel(color, label, message) {
let labelWidth = Math.max(minimumLabelWidth, label.length);
let lines = message.split("\n");
this.log(`${color(`[${padLabel(label)}]`)} ${lines[0]}`);
lines.slice(1).forEach((line) => {
this.log(`${color(`${pad(labelWidth + 1, "")}|`)} ${line}`);
});
},
warning: function logWarning(...args) {
let message = concatenateArguments(args);
this.logWithLabel(chalk.bold.yellow, "⚠ WARNING", message);
},
error: function logError(...args) {
let message = concatenateArguments(args);
this.logWithLabel(chalk.bold.red, "✖ ERROR", message);
},
info: function logInfo(...args) {
let message = concatenateArguments(args);
this.logWithLabel(chalk.bold.cyan, "* info", message);
},
done: function done(item) {
let formatted = itemFormatter({ item });
this.logWithLabel(chalk.bold.green, "✔ done", formatted);
},
debug: function debug(...args) {
if (process.env.SCRAPER_DEBUG === "1") {
let message = concatenateArguments(args);
this.log(chalk.gray(message));
}
}
};
};
Loading…
Cancel
Save