You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

125 lines
2.7 KiB

  1. const moment = require('moment')
  2. const childProcess = require('child_process')
  3. const _ = require('lodash')
  4. const configHelper = require('../helpers/config')
  5. /* global WIKI */
  6. class Job {
  7. constructor({
  8. name,
  9. immediate = false,
  10. schedule = 'P1D',
  11. repeat = false,
  12. worker = false
  13. }) {
  14. this.finished = Promise.resolve()
  15. this.name = name
  16. this.immediate = immediate
  17. this.schedule = moment.duration(schedule)
  18. this.repeat = repeat
  19. this.worker = worker
  20. }
  21. /**
  22. * Start Job
  23. *
  24. * @param {Object} data Job Data
  25. */
  26. start(data) {
  27. if (this.immediate) {
  28. this.invoke(data)
  29. } else {
  30. this.queue(data)
  31. }
  32. }
  33. /**
  34. * Queue the next job run according to the wait duration
  35. *
  36. * @param {Object} data Job Data
  37. */
  38. queue(data) {
  39. this.timeout = setTimeout(this.invoke.bind(this), this.schedule.asMilliseconds(), data)
  40. }
  41. /**
  42. * Run the actual job
  43. *
  44. * @param {Object} data Job Data
  45. */
  46. async invoke(data) {
  47. try {
  48. if (this.worker) {
  49. const proc = childProcess.fork(`server/core/worker.js`, [
  50. `--job=${this.name}`,
  51. `--data=${data}`
  52. ], {
  53. cwd: WIKI.ROOTPATH
  54. })
  55. this.finished = new Promise((resolve, reject) => {
  56. proc.on('exit', (code, signal) => {
  57. if (code === 0) {
  58. resolve()
  59. } else {
  60. reject(signal)
  61. }
  62. proc.kill()
  63. })
  64. })
  65. } else {
  66. this.finished = require(`../jobs/${this.name}`)(data)
  67. }
  68. await this.finished
  69. } catch (err) {
  70. WIKI.logger.warn(err)
  71. }
  72. if (this.repeat) {
  73. this.queue(data)
  74. }
  75. }
  76. /**
  77. * Stop any future job invocation from occuring
  78. */
  79. stop() {
  80. clearTimeout(this.timeout)
  81. }
  82. }
  83. module.exports = {
  84. jobs: [],
  85. init() {
  86. return this
  87. },
  88. start() {
  89. _.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
  90. if (WIKI.config.offline && queueParams.offlineSkip) {
  91. WIKI.logger.warn(`Skipping job ${queueName} because offline mode is enabled. [SKIPPED]`)
  92. return
  93. }
  94. const schedule = (configHelper.isValidDurationString(queueParams.schedule)) ? queueParams.schedule : 'P1D'
  95. this.registerJob({
  96. name: _.kebabCase(queueName),
  97. immediate: _.get(queueParams, 'onInit', false),
  98. schedule: schedule,
  99. repeat: _.get(queueParams, 'repeat', false),
  100. worker: _.get(queueParams, 'worker', false)
  101. })
  102. })
  103. },
  104. registerJob(opts, data) {
  105. const job = new Job(opts)
  106. job.start(data)
  107. if (job.repeat) {
  108. this.jobs.push(job)
  109. }
  110. return job
  111. },
  112. stop() {
  113. this.jobs.forEach(job => {
  114. job.stop()
  115. })
  116. }
  117. }