You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

134 lines
3.2 KiB

  1. const moment = require('moment')
  2. const childProcess = require('child_process')
  3. const _ = require('lodash')
  4. const configHelper = require('../helpers/config')
  5. /* global WIKI */
  6. class Job {
  7. constructor({
  8. name,
  9. immediate = false,
  10. schedule = 'P1D',
  11. repeat = false,
  12. worker = false
  13. }, queue) {
  14. this.queue = queue
  15. this.finished = Promise.resolve()
  16. this.name = name
  17. this.immediate = immediate
  18. this.schedule = moment.duration(schedule)
  19. this.repeat = repeat
  20. this.worker = worker
  21. }
  22. /**
  23. * Start Job
  24. *
  25. * @param {Object} data Job Data
  26. */
  27. start(data) {
  28. this.queue.jobs.push(this)
  29. if (this.immediate) {
  30. this.invoke(data)
  31. } else {
  32. this.queue(data)
  33. }
  34. }
  35. /**
  36. * Queue the next job run according to the wait duration
  37. *
  38. * @param {Object} data Job Data
  39. */
  40. queue(data) {
  41. this.timeout = setTimeout(this.invoke.bind(this), this.schedule.asMilliseconds(), data)
  42. }
  43. /**
  44. * Run the actual job
  45. *
  46. * @param {Object} data Job Data
  47. */
  48. async invoke(data) {
  49. try {
  50. if (this.worker) {
  51. const proc = childProcess.fork(`server/core/worker.js`, [
  52. `--job=${this.name}`,
  53. `--data=${data}`
  54. ], {
  55. cwd: WIKI.ROOTPATH,
  56. stdio: ['inherit', 'inherit', 'pipe', 'ipc']
  57. })
  58. const stderr = [];
  59. proc.stderr.on('data', chunk => stderr.push(chunk))
  60. this.finished = new Promise((resolve, reject) => {
  61. proc.on('exit', (code, signal) => {
  62. const data = Buffer.concat(stderr).toString()
  63. if (code === 0) {
  64. resolve(data)
  65. } else {
  66. const err = new Error(`Error when running job ${this.name}: ${data}`)
  67. err.exitSignal = signal
  68. err.exitCode = code
  69. err.stderr = data
  70. reject(err)
  71. }
  72. proc.kill()
  73. })
  74. })
  75. } else {
  76. this.finished = require(`../jobs/${this.name}`)(data)
  77. }
  78. await this.finished
  79. } catch (err) {
  80. WIKI.logger.warn(err)
  81. }
  82. if (this.repeat && this.queue.jobs.includes(this)) {
  83. this.queue(data)
  84. } else {
  85. this.stop().catch(() => {})
  86. }
  87. }
  88. /**
  89. * Stop any future job invocation from occuring
  90. */
  91. async stop() {
  92. clearTimeout(this.timeout)
  93. this.queue.jobs = this.queue.jobs.filter(x => x !== this)
  94. return this.finished
  95. }
  96. }
  97. module.exports = {
  98. jobs: [],
  99. init() {
  100. return this
  101. },
  102. start() {
  103. _.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
  104. if (WIKI.config.offline && queueParams.offlineSkip) {
  105. WIKI.logger.warn(`Skipping job ${queueName} because offline mode is enabled. [SKIPPED]`)
  106. return
  107. }
  108. const schedule = (configHelper.isValidDurationString(queueParams.schedule)) ? queueParams.schedule : 'P1D'
  109. this.registerJob({
  110. name: _.kebabCase(queueName),
  111. immediate: _.get(queueParams, 'onInit', false),
  112. schedule: schedule,
  113. repeat: _.get(queueParams, 'repeat', false),
  114. worker: _.get(queueParams, 'worker', false)
  115. })
  116. })
  117. },
  118. registerJob(opts, data) {
  119. const job = new Job(opts, this)
  120. job.start(data)
  121. return job
  122. },
  123. async stop() {
  124. return Promise.all(this.jobs.map(job => job.stop()))
  125. }
  126. }