FIXME: - maximumFailure threshold for a task type, after which the task gets stopped until the issue is resolved, to avoid unnecessary requests when eg. the parsing after it fails - log [updated], not just [new], and also other operations - add simulation for addData, addMetadata - task dependencies... CTE for tasks listing their completed-and-valid status, and then use that for both selection of to-do tasks and excluding those where dependencies have not been satisfied yet? - each item has zero or more tags - tags may (but do not have to) contain a value - tags define which processing tasks must be invoked for an item - tasks have full access to the item data, and may request its metadata for other tasks (though there is no guarantee of order of execution -- maybe there should be?) - task results may be any of the following: (OUTDATED: see API methods below instead) - data updates (in which case they are merged into the item, using a user-specified merging callback) - when using a TTL, data updates are useful for data which is allowed to go stale but should eventually be updated - revisions are NOT kept for data merges! data overrides are permanent and final. - new items (in which case the new items are created, but only if the specified ID does not yet exist) - external item upserts (in which case the original item is fetched, if any, and passed to a user-specified merging callback, defaulting to empty object if nonexistent) - metadata (in which case the task result is stored separately, addressable by a combination of item ID + task name) - when using a TTL, metadata is useful for data which should only be taken into consideration for as long as it is fresh (as the separate storage allows for identifying the specific data that has expired) - likewise, expired task result metadata is not kept; it's overwritten with the new metadata instead - "delete item" (mutually exclusive with metadata and data updates) -- meant to be used for ephemeral items, like pagination URLs in a category listing - "expire task results" (addressed by ID + operation, default to own ID for "we immediately know this data is stale but we want to prioritize un-processed items first" cases? as well as for "retry because this failed" cases, which are always explicit) - "create alias" (addressed by ID, default to own ID for "we've discovered an alias for this item") - "merge item" (addressed by old and new ID) - repoints any aliases - takes user-supplied callback for merging item data and task result metadata; for any of those for which no callback is supplied, it simply picks the newest result - when in doubt, use metadata, not data updates - data updates should typically only really be used for cases where the actual item data is collected across multiple tasks (eg. because some sites require parsing multiple pages to get all of the data) *or* potentially multiple tasks need access to it (eg. the URL to scrape) - it may also be useful to use metadata to track the source of certain pieces of data - in some cases it doesn't matter whether data or metadata is used, and it's up to the user which one to pick. we should document the exact tradeoffs to help along this process. - task results can expire, either TTL or manually - items can be manually added, eg. for seeding purposes - each task can be rate-limited individually, either for a specific tag / set of tags, or globally - item ID is either randomly generated or explicitly specified; the latter mode is also used for randomly generated IDs that have a prefix - while tags determine what tasks to invoke on an object, the actual task results are associated with the object, not with the tag; because multiple tags may mark the same object for the same task. the tag-task assocation *only* serves to build a list of outstanding tasks - TODO: eventually support binary data storage for items as well, associated with a given item - TO FIGURE OUT: session re-establishing, like in scrappie, and how this fits into the model - TO FIGURE OUT: how to deal with cases where it is belatedly realized that a new item should actually just be an alias to a previous item - TO FIGURE OUT: how to handle tag renaming? this would be a stateful operation, that involves a change in the existing state in addition to a configuration change. need to figure out a UI for renaming a tag that *doesn't* run the risk of a split-brain situation when the process isn't restarted with a new config at exactly the same moment as the state gets changed. - maybe a special boot mode that changes the state prior to starting the scraping processes? - or let the user specify the rename in the configuration, and use some sort of version number to distinguish "old tag X" from "new tag X" when a tag name X gets reused? - or maybe just keep it simple, let the user specify tag renames in the configuration, and just apply that rename to the full DB on every start of the process... then expect users to remove it from the renames again once they want to reuse a tag - TO CONSIDER: if the TTL for an task gets changed, should existing task results be left at their old expiry time, or get updated to the new expiry time? - TODO: allow marking a failure as 'handled' - let the user define *how* to merge the data, by letting them specify a callback that receives the old data as an argument - an item can be aliased, for handling items which have multiple canonical identifiers (eg. an internal SKU and the URL of a product page) - any task which references item IDs can then also use the alias in its stead - the item name itself is added as an alias to itself, to make queries simpler, as they can then just always do a lookup in the aliases table + it ensures that in-use item IDs cannot be created as an alias - ... however, the item name is still treated as the primary/canonical identifier for display purposes, with aliases being listed separately, and so it should usually be the 'most canonical' identifier (eg. an SKU instead of a product page URL) - when dealing with multiple namespaces (eg. SKU, EAN, URL), each identifier should ideally be prefixed as such, as in EAN:20060503 - because of this, we need to make sure to return all of these identifiers in the item lookup response as well, as the client may have gotten there through an alias, and so does not necessarily know the canonical identifier! - aliases need to be mutable, to deal with eg. item merges - aliases can only be single-level; any collapsing needs to be done at alias-mutation-time - a task failing DOES NOT clear out the existing metadata+TTL, if any, for the item. likewise, new metadata (with a corresponding TTL) *may* have been set before the task failed, and this will be respected. failed items will instead be ignored by the worklist query, until the item is marked as un-failed. - each task result stores the version of the task with which it was produced; changing the task version therefore automatically marks all entries for the old version as stale (and that is the only case where the version *should* be bumped) - let the user specify for each task whether results should be applied or discarded upon task failure? - seed items should be inserted conditionally, only if the ID does not already exist. for this reason, specifying an ID is mandatory for seed items. - strongly recommend to the user that they give everything an explicit ID, because this is crucial for deduplicating items, and preventing duplicate operations from being run for tasks that have a TTL *and* where the items exist in the database multiple times due to randomly-generated IDs - TO CONSIDER: for distributed scraping, what strategy to use? - 1. execute task itself on worker node, but send back a log of desired changes, and assume that the code is the same on both sides, using the same callbacks on the central server end, in some way - 2. track which data/metadata tasks tend to request, and if this occurs for more than a certain percentage of task executions, pre-fetch that data and send it along when sending the task data itself to the worker - 3. something else? - there are global rate limits, and then the user can also override those on a per-task basis, which will make the task subject to *both* the global ratelimit *and* the task-specific ratelimit - allow the user to explicitly specify that they want the task exempted from the global ratelimit? - also possible to specify a rate limit for a group of tasks, eventually, to eg. group together tasks that target the same website (including wildcard matching task names) - TO CONSIDER: are tags that can contain values actually useful, when data can already be stored in the item data itself + the task metadata? - TO FIGURE OUT: log method with log levels? - TO IMPLEMENT: it must be possible for an item to set its created_by explicitly; for example, if item A produces ephemeral work item B which then results in item C, that item C should be able to claim to have been produced by item A, so that B can be safely deleted without breaking the reference chain - "max outstanding failures for task" option before completely disabling the task, eg. to deal with upstream site changes - plus a "mark all failures handled" operation for the task type, once it's been resolved ITEMS id (string) data (json) created_by -- references an item ID, *via* an alias (to handle item merges), and this is allowed to be a broken reference! created_at updated_at ALIASES id (string) item_id TAGS id (num) item_id tag_name (string) value (opt.) TASK_RESULTS id (num) task (string, name) task_version (string) item_id metadata (json) is_successful (boolean) -- set to false when an error occurred during execution of the operation is_invalidated (boolean) created_at expires_at UNIQUE (task, item_id) TASKS_IN_PROGRESS item_id task (string) UNIQUE (task, item_id) TASKS not stored in DB but defined in configuration, and keyed by name FAILURES id (num) task_result_id error_data (json) occurred_at API endpoints ============= GET /items/:id -- get current item data + metadata about it POST /items/add tags (opt.): string[] data (req.): json DELETE /items/:id GET /items/:id/metadata ?operations: string[] -- operations to fetch the metadata for GET /items/:id/operations -- operations that apply to this item, and the status of each (incl. expiry and whether it was successful) POST /items/:id/operations/:operation/expire -- forcibly mark as expired POST /items/:id -- ??? GET /tags/:tagname/worklist ?limit: integer -- maximum amount of items in the worklist to fetch Task arguments/methods ====================== data The currently-known item data for the item we're processing id The canonical ID of the item currently being processed. NOTE: Instruct the user that this should *not* encode things like a URL to fetch, and that should be part of the item data instead. It's only meant for things like "should we rename this item from a URL to an SKU", for example. getItem(id) Asynchronously fetches the item data for the specified item - only typically used for items *other* than the one we're currently processing getMetadata(taskName) getMetadata({ id?, taskName }) Asynchronously fetches the stored metadata, if any, for the specified taskName - either for the item we're processing, or for a specified item ID. createItem({ id?, tags?, data, failIfExists? }) Queues the creation of a new item, ignoring the create operation if the item already exists NOTE: User can specify explicitly that it should fail with an error if the item already exists, but that's not usually recommended - instead, scraper code should robustly deal with discarded creates, which can also be an important implementation detail for performance in a distributed scraping cluster. renameItem(to) renameItem({ from?, to }) Changes the primary item ID for the specified (or current) item, updating all alias pointers to point at the new `to` ID. mergeItem({ from?, into, merge: (intoOldData, fromOldData) => intoNewData, mergeMetadata: { [taskName]?: merger } }) Merges the specified (or current) item into the specified other item, using a user-supplied merging callback. Note that intoOldData comes first as a callback argument, because the user will almost always need that data, but only sometimes needs the existing data (if any) of the to-be-merged item, which will be subsequently deleted. NOTE: This also changes all the alias pointers from the `from` to the `into`, *including* the ID of the to-be-deleted `from` item (which the user can separately delete using `deleteAlias`, if they want to). Essentially, it leaves behind a redirect. NOTE: For any tasks which do not have a merger specified, it takes whichever set of metadata is most recent deleteItem() deleteItem({ id? }) Deletes the specified (or current) item from the database. createAlias({ from, to?, failIfExists? }) Creates a new alias from the given `from` identifier to the specified `to` (or current, if unspecified) item. Silently fails if the `from` already exists, unless `failIfExists` is true. NOTE: The `to` will be looked up in the aliases table rather than being assumed to be an item ID, to allow for collapsing alias pointers. deleteAlias(from) Deletes the specified alias. Fails with an error if the alias points to itself (and so is an item ID rather than a 'real' alias). updateData((oldData) => newData) updateData({ id?, merge: (oldData) => newData }) Queues a data update for the current or specified item, invoking a callback with the previous data as the argument updateMetadata((oldMetadata) => newMetadata) updateMetadata({ id?, taskName?, merge: (oldData) => newData }) Queues a data update for the current or specified task, for the current or specified item, invoking a callback with the previous data as the argument NOTE: Always invoked, even if there's no old data in the database, in which case the `oldData` argument is an empty object. That way the user usually only needs to specify a single piece of logic to handle both fresh and updated metadata NOTE: This should rarely be used for a taskName other than the current task! Usually data collected across multiple tasks should be stored in the item data instead. Modifying another task's result will *not* result in its TTL being reset. expire() expire({ id?, taskName? }) Marks the task result of the specified (or current) task as expired/invalidated, essentially immediately re-queueing it for task execution. setTTL(ttl) setTTL({ id?, taskName?, ttl }) Changes the TTL for the task results of the specified (or current) item and taskName. Note that this overrides the default TTL for the task, and is computed from the time of the last task result, *not* from the current time. allowFailure(allowed) Marks the current task as "allowed to fail" (default) or "not allowed to fail" (if `false` is passed). "Failing" means any synchronously thrown error or rejected Promise. In all cases, failures are still recorded and items are marked for re-processing - this method only controls whether the task result is discarded or stored. It defaults to disallowing failure because an error can result in corrupted data, but the user may wish to store data anyway in specific cases where they know that the data is valid despite the error. log(category, message) Logs a message. The category can be freely specified, but categories with built-in formatting/visibility rules are `error`, `warn`, `info`, and `debug`. Other categories may be hidden by default. /// - data updates (in which case they are merged into the item, using a user-specified merging callback) - when using a TTL, data updates are useful for data which is allowed to go stale but should eventually be updated - revisions are NOT kept for data merges! data overrides are permanent and final. - new items (in which case the new items are created, but only if the specified ID does not yet exist) - item upserts (in which case the original item is fetched, if any, and passed to a user-specified merging callback, defaulting to empty object if nonexistent) - metadata (in which case the task result is stored separately, addressable by a combination of item ID + task name) - when using a TTL, metadata is useful for data which should only be taken into consideration for as long as it is fresh (as the separate storage allows for identifying the specific data that has expired) - likewise, expired task result metadata is not kept; it's overwritten with the new metadata instead - "delete item" (mutually exclusive with metadata and data updates) -- meant to be used for ephemeral items, like pagination URLs in a category listing - "expire task results" (addressed by ID + operation, default to own ID for "we immediately know this data is stale but we want to prioritize un-processed items first" cases? as well as for "retry because this failed" cases, which are always explicit) - "create alias" (addressed by ID, default to own ID for "we've discovered an alias for this item") - "merge item" (addressed by old and new ID) - repoints any aliases - takes user-supplied callback for merging item data and task result metadata; for any of those for which no callback is supplied, it simply picks the newest result /// Task recipes ============ During the task, one or more new items are discovered Loop, create new item During the task, we discover that some of our metadata from another task has gone stale Expire other task's results on the item (upon discovery) After completing this task, schedule a re-run of another dependent task Expire other task's results on the item (always) We've received stale data, and want to immediately schedule a new future run Expire active task's results on the item We've found other previously-processed items that should be merged into this one Find/fetch other items, update current item with new data, repoint aliases from other IDs to self, delete other items We're currently processing an item that we belatedly discover we already know about under another ID Update other item with new data, delete self, create alias from own ID to other ID TO FIGURE OUT: How do we store metadata for another item? Do we need an explicit storeTaskResultFor method to store the current result but associated with another item ID instead? Queries ======= Get work list for tag+operation: select from items (join tags required-on item_id = item.id and tag = :tag, results on item_id = item.id and operation = :operation) and (item.id, :operation) not in operations_in_progress and result is null or result.expires_at < current_time or is_invalidated = true limit 10 // NOTE: Prioritizes (item,operation) pairs that do not have any result at all, only considering stale pairs after the unprocessed pairs run out WITH candidates AS ( SELECT DISTINCT ON (items.id) items.*, results.expires_at, results.is_invalidated, results.is_successful, results.id AS result_id FROM items INNER JOIN tags ON tags.item_id = items.id AND tags.tag IN :tags LEFT JOIN operation_results AS results ON results.item_id = items.id AND results.operation = :operation WHERE NOT EXISTS ( SELECT FROM operations_in_progress AS pr WHERE pr.item_id = items.id ) ) SELECT * FROM candidates WHERE result_id IS NULL UNION SELECT * FROM candidates WHERE results.is_successful = TRUE AND ( results.expires_at < NOW() OR results.is_invalidated = TRUE OR NOT (results.task_version = :taskVersion) ); Get failures for operation type: SELECT failures.* FROM failures INNER JOIN task_results ON failures.task_result_id = task_results.id WHERE task_results.operation = :operation ================== task_states all task_results, with is_candidate precalculated dependency_task_states all task_states for tasks+versions listed in :dependencyTasks candidates (items which are *permitted* to be run) all tag-relevant items which are not listed as "in progress", and for which all dependency tasks have been satisfied all dependency tasks have been satisfied = NOT EXISTS ( join(dependencyTasks, task_states on task/version and where item_id) where is_candidate is null or true ) results (items which *should* be run, in order of preference) all candidates (item + task_result) for which there is either: 1. no entry in task_states 2. an is_candidate=TRUE entry in task_states 3. an entry in task_states with mismatching task_version