You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

120 lines
2.4 KiB

  1. const moment = require('moment')
  2. const childProcess = require('child_process')
  3. const _ = require('lodash')
  4. const configHelper = require('../helpers/config')
  5. /* global WIKI */
  6. class Job {
  7. constructor({
  8. name,
  9. immediate = false,
  10. schedule = 'P1D',
  11. repeat = false,
  12. worker = false
  13. }) {
  14. this.finished = Promise.resolve()
  15. this.name = name
  16. this.immediate = immediate
  17. this.schedule = moment.duration(schedule)
  18. this.repeat = repeat
  19. this.worker = worker
  20. }
  21. /**
  22. * Start Job
  23. *
  24. * @param {Object} data Job Data
  25. */
  26. start(data) {
  27. if (this.immediate) {
  28. this.invoke(data)
  29. } else {
  30. this.queue(data)
  31. }
  32. }
  33. /**
  34. * Queue the next job run according to the wait duration
  35. *
  36. * @param {Object} data Job Data
  37. */
  38. queue(data) {
  39. this.timeout = setTimeout(this.invoke.bind(this), this.schedule.asMilliseconds(), data)
  40. }
  41. /**
  42. * Run the actual job
  43. *
  44. * @param {Object} data Job Data
  45. */
  46. async invoke(data) {
  47. try {
  48. if (this.worker) {
  49. const proc = childProcess.fork(`server/core/worker.js`, [
  50. `--job=${this.name}`,
  51. `--data=${data}`
  52. ], {
  53. cwd: WIKI.ROOTPATH
  54. })
  55. this.finished = new Promise((resolve, reject) => {
  56. proc.on('exit', (code, signal) => {
  57. if (code === 0) {
  58. resolve()
  59. } else {
  60. reject(signal)
  61. }
  62. proc.kill()
  63. })
  64. })
  65. } else {
  66. this.finished = require(`../jobs/${this.name}`)(data)
  67. }
  68. await this.finished
  69. } catch (err) {
  70. WIKI.logger.warn(err)
  71. }
  72. if (this.repeat) {
  73. this.queue(data)
  74. }
  75. }
  76. /**
  77. * Stop any future job invocation from occuring
  78. */
  79. stop() {
  80. clearTimeout(this.timeout)
  81. }
  82. }
  83. module.exports = {
  84. jobs: [],
  85. init() {
  86. return this
  87. },
  88. start() {
  89. _.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
  90. const schedule = (configHelper.isValidDurationString(queueParams.schedule)) ? queueParams.schedule : _.get(WIKI.config, queueParams.schedule)
  91. this.registerJob({
  92. name: _.kebabCase(queueName),
  93. immediate: queueParams.onInit,
  94. schedule: schedule,
  95. repeat: true
  96. })
  97. })
  98. },
  99. registerJob(opts, data) {
  100. const job = new Job(opts)
  101. job.start(data)
  102. if (job.repeat) {
  103. this.jobs.push(job)
  104. }
  105. return job
  106. },
  107. stop() {
  108. this.jobs.forEach(job => {
  109. job.stop()
  110. })
  111. }
  112. }