'use strict'; const Promise = require("bluebird"); const bhttp = require("bhttp"); const promiseTaskQueue = require("promise-task-queue"); const createEventEmitter = require("create-event-emitter"); const defaultValue = require("default-value"); const debug = require("debug")("pastebinStream:scrapers:pastebinCom"); const promiseSetInterval = require("../promise-set-interval"); const errors = require("../errors"); function tryParseBody(body) { try { return JSON.parse(body); } catch (err) { throw new errors.HttpError(`Got rate-limited? Error message: ${body}`, {type: "rateLimited"}); } } module.exports = function createPastebinComScraper(options = {}) { let queue = promiseTaskQueue(); let knownPastes = []; let previousKnownPastes = []; queue.define("fetchPaste", (task) => { return Promise.try(() => { return bhttp.get(`http://pastebin.com/api_scrape_item.php?i=${task.pasteKey}`); }).then((response) => { if (response.statusCode !== 200) { // FIXME: Retry! throw new errors.HttpError(`Encountered a non-200 status code for Pastebin.com while retrieving a raw paste: ${response.statusCode}`, {type: "statusCode"}); } else if (response.body.toString().indexOf("Please slow down, you are hitting our servers unnecessarily hard!") === 0) { throw new errors.HttpError("Got rate-limited!", {type: "rateLimited"}); } else { return response.body.toString(); } }) }, { interval: options.pasteInterval }); let loop; return createEventEmitter({ stop: function stopScraper() { if (loop != null) { this.emit("stopped"); loop.cancel(); } }, start: function startScraper() { loop = promiseSetInterval(() => { return Promise.try(() => { return bhttp.get(`http://pastebin.com/api_scraping.php?limit=${defaultValue(options.listLimit, 100)}`, { noDecode: true /* Because Pastebin.com errors aren't JSON... */ }); }).then((response) => { if (response.statusCode !== 200) { throw new errors.HttpError(`Encountered a non-200 status code for Pastebin.com while listing the most recent pastes: ${response.statusCode}`, {type: "statusCode"}); } else { return tryParseBody(response.body).reverse(); } }).tap((pastes) => { previousKnownPastes = knownPastes; knownPastes = pastes.map(paste => paste.key); }).filter((paste) => { return (!previousKnownPastes.includes(paste.key)); }).tap((pastes) => { debug(`Found ${pastes.length} new pastes`); }).each((paste) => { /* We *intentionally* do not return the Promise chain below; we don't want to block the interval with queue items. */ Promise.try(() => { return queue.push("fetchPaste", { pasteKey: paste.key }); }).then((rawPaste) => { this.emit("paste", Object.assign({ raw: rawPaste }, paste)); }).catch((err) => { this.emit("error", err); }); }).catch((err) => { /* This is where eg. rate-limiting errors will end up. */ this.emit("error", err); }); }, defaultValue(options.listInterval, 60) * 1000, { startImmediately: true }); } }); };