You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

54 lines
1.4 KiB

  1. const path = require('path')
  2. const Bull = require('bull')
  3. const Promise = require('bluebird')
  4. const _ = require('lodash')
  5. /* global WIKI */
  6. module.exports = {
  7. job: {},
  8. init() {
  9. _.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
  10. this.job[queueName] = new Bull(queueName, {
  11. prefix: `q-${WIKI.config.ha.uid}`,
  12. redis: WIKI.config.redis
  13. })
  14. this.job[queueName].process(path.join(WIKI.SERVERPATH, `jobs/${_.kebabCase(queueName)}.js`))
  15. })
  16. return this
  17. },
  18. start() {
  19. _.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
  20. if (queueParams.onInit) {
  21. this.job[queueName].add({}, {
  22. removeOnComplete: true
  23. })
  24. }
  25. if (queueParams.cron) {
  26. this.job[queueName].add({}, {
  27. repeat: { cron: queueParams.cron },
  28. removeOnComplete: true
  29. })
  30. }
  31. })
  32. },
  33. async clean() {
  34. return Promise.each(_.keys(WIKI.data.jobs), queueName => {
  35. return new Promise((resolve, reject) => {
  36. let keyStream = WIKI.redis.scanStream({
  37. match: `q-${WIKI.config.ha.uid}:${queueName}:*`
  38. })
  39. keyStream.on('data', resultKeys => {
  40. if (resultKeys.length > 0) {
  41. WIKI.redis.del(resultKeys)
  42. }
  43. })
  44. keyStream.on('end', resolve)
  45. })
  46. }).then(() => {
  47. WIKI.logger.info('Purging old queue jobs: [ OK ]')
  48. }).return(true).catch(err => {
  49. WIKI.logger.error(err)
  50. })
  51. }
  52. }