You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

180 lines
5.8 KiB
JavaScript

"use strict";
const Promise = require("bluebird");
const defaultValue = require("default-value");
const createScrapeLogger = require("../scrape-logger");
const normalizeItems = require("../normalize-items");
const normalizeUrls = require("../normalize-urls");
const createDbApi = require("./db");
const runDisposableQueue = require("./disposable-queue");
const errors = require("../errors");
const itemHasPartialData = require("../item-has-partial-data");
const findScraperHandler = require("../find-scraper-handler");
module.exports = function createDbRunner(createScraper, { db }) {
let scraper = createScraper();
let logger = createScrapeLogger({ name: scraper.name });
let dbApi = createDbApi({db});
return Promise.try(() => {
logger.log("Initializing database...");
/* MARKER: Switch to dbApi */
return dbApi.getScraperId({ name: scraper.name });
}).then((scraperId) => {
function restoreState(url) {
/* NOTE: This will re-fetch every ancestor of the specified URL, from the least-depth to the most-depth ancestor. This will gradually reconstruct any (session) state needed to correctly fetch the URL in question, and continue processing the queue. This only needs to be done for the very first queued URL after crash recovery, since from that point on the state will be identical to before the crash. */
return Promise.try(() => {
return dbApi.getAncestorsForUrl({ urlId: url.id });
}).then((ancestors) => {
ancestors.reverse();
return Promise.each(ancestors, (ancestorUrl) => {
/* NOTE: We ignore the returned values here. */
logger.debug(`Loading URL for session recovery: ${ancestorUrl.url}`)
return runItem(ancestorUrl);
});
});
}
function runItem(parentUrl) {
return Promise.try(() => {
let updatedUrl = scraper.updateUrl(parentUrl.url);
let handler = findScraperHandler(scraper, updatedUrl);
return Promise.try(() => {
// logger.debug(`Scraping: ${updatedUrl}`);
if (parentUrl.description != null) {
logger.info(`Scraping: ${parentUrl.description}`);
logger.debug(`URL: ${updatedUrl}`)
} else {
logger.info(`Scraping: ${updatedUrl}`);
}
return handler(updatedUrl, {
description: parentUrl.description,
metadata: defaultValue(parentUrl.metadata, {})
});
}).then((result) => {
let normalizedUrls = normalizeUrls(result.urls, { parentUrl: parentUrl });
let normalizedItems = normalizeItems(result.items);
return {
urls: normalizedUrls,
items: normalizedItems
};
});
});
}
function runBatch(batch) {
return Promise.try(() => {
return runDisposableQueue(batch, (parentUrl, disposeQueue) => {
return Promise.try(() => {
return runItem(parentUrl);
}).then(({urls, items}) => {
if (urls.some((url) => url.depth > parentUrl.depth)) {
/* Since we always want to scrape depth-first, we always want our processing queue to only contain the highest-depth items that exist in the database at that time. Therefore, if we encounter any new to-be-queued URLs that are going to have a higher depth than the current item, we immediately throw away the entirety of the queue, triggering a re-fetch from the database for a new batch at the new maximum depth. */
logger.debug(`Disposing URL queue`);
disposeQueue();
}
return Promise.all([
Promise.map(urls, (url) => {
if (url.description != null) {
logger.debug(`Queueing new URL: ${url.description} (${url.url})`);
} else {
logger.debug(`Queueing new URL: ${url.url}`);
}
return dbApi.addUrlToQueue({
scraperId: scraperId,
parentUrlId: parentUrl.id,
depth: url.depth,
url: url.url,
description: url.description,
metadata: url.metadata
});
}),
Promise.map(items, (item) => {
logger.done(item);
logger.debug(`Encountered item`, item);
return dbApi.storeItem({
scraperId: scraperId,
urlId: parentUrl.id,
data: item,
isPartial: itemHasPartialData(item)
});
})
]);
}).then(() => {
return dbApi.markUrlDone({ urlId: parentUrl.id });
}).catch(errors.NoHandler, (err) => {
logger.error(err.message);
});
});
}).then(() => {
return dbApi.getBatch({
scraperId: scraperId,
amount: 20
});
}).then((nextBatch) => {
if (nextBatch.length > 0) {
return runBatch(nextBatch);
}
});
}
return Promise.try(() => {
return scraper.initialize();
}).then((rootUrls) => {
return dbApi.getRootOrBatch({
scraperId: scraperId,
rootUrls: normalizeUrls(rootUrls),
amount: 20
});
}).then((firstBatch) => {
if (firstBatch.length > 0) {
return Promise.try(() => {
if (firstBatch[0].depth > 0) {
/* We only need to do this for URLs other than the root URLs. */
return restoreState(firstBatch[0]);
}
}).then(() => {
return runBatch(firstBatch);
});
} else {
throw new Error("No URLs in queue after scraper initialization");
}
});
// return Promise.try(() => {
// return Promise.all([
// getBatch(20),
// scraper.initialize()
// ]);
// }).then(([batch, rootUrls]) => {
// if (batch.length === 0) {
// return Promise.try(() => {
// return Promise.map(assureArray(rootUrls), (rootUrl) => {
// return db("urls").insert({
// url: rootUrl,
// depth: null,
// scraper_id: scraperId
// });
// });
// }).then(() => {
// return getBatch(20);
// });
// } else {
// /* NOTE: We ignore the root URLs if there are already unscraped URLs in the database. */
// return batch;
// }
// }).then((batch) => {
// /* MARKER: Implement runBatch, recursively get and process a new batch */
// return runBatch(batch);
// });
});
};