You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
|
|
const path = require('path') const Bull = require('bull') const Promise = require('bluebird') const _ = require('lodash')
/* global WIKI */
module.exports = { job: {}, init() { _.forOwn(WIKI.data.jobs, (queueParams, queueName) => { this.job[queueName] = new Bull(queueName, { prefix: `queue`, redis: WIKI.config.redis }) if (queueParams.concurrency > 0) { this.job[queueName].process(queueParams.concurrency, path.join(WIKI.SERVERPATH, `jobs/${_.kebabCase(queueName)}.js`)) } else { this.job[queueName].process(path.join(WIKI.SERVERPATH, `jobs/${_.kebabCase(queueName)}.js`)) } }) return this }, start() { _.forOwn(WIKI.data.jobs, (queueParams, queueName) => { if (queueParams.onInit) { this.job[queueName].add({}, { removeOnComplete: true }) } if (queueParams.cron) { this.job[queueName].add({}, { repeat: { cron: queueParams.cron }, removeOnComplete: true }) } }) }, async quit() { for (const queueName in this.job) { await this.job[queueName].close() } }, async clean() { return Promise.each(_.keys(WIKI.data.jobs), queueName => { return new Promise((resolve, reject) => { let keyStream = WIKI.redis.scanStream({ match: `queue:${queueName}:*` }) keyStream.on('data', resultKeys => { if (resultKeys.length > 0) { WIKI.redis.del(resultKeys) } }) keyStream.on('end', resolve) }) }).then(() => { WIKI.logger.info('Purging old queue jobs: [ OK ]') }).return(true).catch(err => { WIKI.logger.error(err) }) } }
|