You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

172 lines
5.6 KiB
JavaScript

"use strict";
const Promise = require("bluebird");
const databaseError = require("database-error");
const assureArray = require("assure-array");
const defaultValue = require("default-value");
const createScrapeLogger = require("./scrape-logger");
const normalizeItems = require("./normalize-items");
const normalizeUrls = require("./normalize-urls");
const runDisposableQueue = require("./disposable-queue");
const errors = require("./errors");
const itemHasPartialData = require("./item-has-partial-data");
const findScraperHandler = require("./find-scraper-handler");
module.exports = function createScraper({ backend, scraper }) {
if (backend == null || scraper == null) {
throw new Error("Must specify both a backend and a scraper");
} else {
let logger = createScrapeLogger({ name: scraper.name, itemFormatter: backend.formatItem });
function addRootUrl({ url, description, metadata }) {
return backend.addUrl({ url, description, metadata, depth: 0 });
}
function getRootOrBatch({ amount, rootUrls }) {
return Promise.try(() => {
return backend.getBatch({ amount });
}).then((results) => {
if (results.length > 0) {
return results;
} else {
return Promise.try(() => {
return Promise.map(assureArray(rootUrls), (rootUrl) => {
return addRootUrl({
url: rootUrl.url,
description: rootUrl.description
});
});
}).then(() => {
return backend.getBatch({ amount });
});
}
}).catch(databaseError.rethrow);
}
function restoreState(url) {
/* NOTE: This will re-fetch every ancestor of the specified URL, from the least-depth to the most-depth ancestor. This will gradually reconstruct any (session) state needed to correctly fetch the URL in question, and continue processing the queue. This only needs to be done for the very first queued URL after crash recovery, since from that point on the state will be identical to before the crash. */
return Promise.try(() => {
return backend.getAncestorsForUrl({ urlId: url.id });
}).then((ancestors) => {
ancestors.reverse();
return Promise.each(ancestors, (ancestorUrl) => {
/* NOTE: We ignore the returned values here. */
logger.debug(`Loading URL for session recovery: ${ancestorUrl.url}`)
return runItem(ancestorUrl);
});
});
}
function runItem(parentUrl) {
return Promise.try(() => {
let updatedUrl = scraper.updateUrl(parentUrl.url);
let handler = findScraperHandler(scraper, updatedUrl);
return Promise.try(() => {
// logger.debug(`Scraping: ${updatedUrl}`);
if (parentUrl.description != null) {
logger.info(`Scraping: ${parentUrl.description}`);
logger.debug(`URL: ${updatedUrl}`)
} else {
logger.info(`Scraping: ${updatedUrl}`);
}
return handler(updatedUrl, {
description: parentUrl.description,
metadata: defaultValue(parentUrl.metadata, {})
});
}).then((result) => {
let normalizedUrls = normalizeUrls(result.urls, { parentUrl: parentUrl });
let normalizedItems = normalizeItems(result.items);
return {
urls: normalizedUrls,
items: normalizedItems
};
});
});
}
function runBatch(batch) {
return Promise.try(() => {
return runDisposableQueue(batch, (parentUrl, disposeQueue) => {
return Promise.try(() => {
return runItem(parentUrl);
}).then(({urls, items}) => {
if (urls.some((url) => url.depth > parentUrl.depth)) {
/* Since we always want to scrape depth-first, we always want our processing queue to only contain the highest-depth items that exist in the backing store at that time. Therefore, if we encounter any new to-be-queued URLs that are going to have a higher depth than the current item, we immediately throw away the entirety of the queue, triggering a re-fetch from the backing store for a new batch at the new maximum depth. */
logger.debug(`Disposing URL queue`);
disposeQueue();
}
return Promise.all([
Promise.map(urls, (url) => {
if (url.description != null) {
logger.debug(`Queueing new URL: ${url.description} (${url.url})`);
} else {
logger.debug(`Queueing new URL: ${url.url}`);
}
return backend.addUrl({
parentUrlId: parentUrl.id,
depth: url.depth,
url: url.url,
description: url.description,
metadata: url.metadata
});
}),
Promise.map(items, (item) => {
logger.done(item);
logger.debug(`Encountered item`, item);
return backend.storeItem({
parentUrlId: parentUrl.id,
data: item,
isPartial: itemHasPartialData(item)
});
})
]);
}).then(() => {
return backend.markUrlDone({ urlId: parentUrl.id });
}).catch(errors.NoHandler, (err) => {
logger.error(err.message);
});
});
}).then(() => {
return backend.getBatch({
amount: 20
});
}).then((nextBatch) => {
if (nextBatch.length > 0) {
return runBatch(nextBatch);
}
});
}
return function startScraper() {
return Promise.try(() => {
return scraper.initialize();
}).then((rootUrls) => {
return getRootOrBatch({
rootUrls: normalizeUrls(rootUrls),
amount: 20
});
}).then((firstBatch) => {
if (firstBatch.length > 0) {
return Promise.try(() => {
if (firstBatch[0].depth > 0) {
/* We only need to do this for URLs other than the root URLs. */
return restoreState(firstBatch[0]);
}
}).then(() => {
return runBatch(firstBatch);
});
} else {
throw new Error("No URLs in queue after scraper initialization");
}
});
}
}
};