You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

58 lines
1.6 KiB

  1. const path = require('path')
  2. const Bull = require('bull')
  3. const Promise = require('bluebird')
  4. const _ = require('lodash')
  5. /* global WIKI */
  6. module.exports = {
  7. job: {},
  8. init() {
  9. _.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
  10. this.job[queueName] = new Bull(queueName, {
  11. prefix: `queue`,
  12. redis: WIKI.config.redis
  13. })
  14. if (queueParams.concurrency > 0) {
  15. this.job[queueName].process(queueParams.concurrency, path.join(WIKI.SERVERPATH, `jobs/${_.kebabCase(queueName)}.js`))
  16. } else {
  17. this.job[queueName].process(path.join(WIKI.SERVERPATH, `jobs/${_.kebabCase(queueName)}.js`))
  18. }
  19. })
  20. return this
  21. },
  22. start() {
  23. _.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
  24. if (queueParams.onInit) {
  25. this.job[queueName].add({}, {
  26. removeOnComplete: true
  27. })
  28. }
  29. if (queueParams.cron) {
  30. this.job[queueName].add({}, {
  31. repeat: { cron: queueParams.cron },
  32. removeOnComplete: true
  33. })
  34. }
  35. })
  36. },
  37. async clean() {
  38. return Promise.each(_.keys(WIKI.data.jobs), queueName => {
  39. return new Promise((resolve, reject) => {
  40. let keyStream = WIKI.redis.scanStream({
  41. match: `queue:${queueName}:*`
  42. })
  43. keyStream.on('data', resultKeys => {
  44. if (resultKeys.length > 0) {
  45. WIKI.redis.del(resultKeys)
  46. }
  47. })
  48. keyStream.on('end', resolve)
  49. })
  50. }).then(() => {
  51. WIKI.logger.info('Purging old queue jobs: [ OK ]')
  52. }).return(true).catch(err => {
  53. WIKI.logger.error(err)
  54. })
  55. }
  56. }