You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

63 lines
1.7 KiB

  1. const path = require('path')
  2. const Bull = require('bull')
  3. const Promise = require('bluebird')
  4. const _ = require('lodash')
  5. /* global WIKI */
  6. module.exports = {
  7. job: {},
  8. init() {
  9. _.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
  10. this.job[queueName] = new Bull(queueName, {
  11. prefix: `queue`,
  12. redis: WIKI.config.redis
  13. })
  14. if (queueParams.concurrency > 0) {
  15. this.job[queueName].process(queueParams.concurrency, path.join(WIKI.SERVERPATH, `jobs/${_.kebabCase(queueName)}.js`))
  16. } else {
  17. this.job[queueName].process(path.join(WIKI.SERVERPATH, `jobs/${_.kebabCase(queueName)}.js`))
  18. }
  19. })
  20. return this
  21. },
  22. start() {
  23. _.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
  24. if (queueParams.onInit) {
  25. this.job[queueName].add({}, {
  26. removeOnComplete: true
  27. })
  28. }
  29. if (queueParams.cron) {
  30. this.job[queueName].add({}, {
  31. repeat: { cron: queueParams.cron },
  32. removeOnComplete: true
  33. })
  34. }
  35. })
  36. },
  37. async quit() {
  38. for (const queueName in this.job) {
  39. await this.job[queueName].close()
  40. }
  41. },
  42. async clean() {
  43. return Promise.each(_.keys(WIKI.data.jobs), queueName => {
  44. return new Promise((resolve, reject) => {
  45. let keyStream = WIKI.redis.scanStream({
  46. match: `queue:${queueName}:*`
  47. })
  48. keyStream.on('data', resultKeys => {
  49. if (resultKeys.length > 0) {
  50. WIKI.redis.del(resultKeys)
  51. }
  52. })
  53. keyStream.on('end', resolve)
  54. })
  55. }).then(() => {
  56. WIKI.logger.info('Purging old queue jobs: [ OK ]')
  57. }).return(true).catch(err => {
  58. WIKI.logger.error(err)
  59. })
  60. }
  61. }