You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

322 lines
8.7 KiB

  1. const _ = require('lodash')
  2. const stream = require('stream')
  3. const Promise = require('bluebird')
  4. const pipeline = Promise.promisify(stream.pipeline)
  5. /* global WIKI */
  6. module.exports = {
  7. async activate() {
  8. // not used
  9. },
  10. async deactivate() {
  11. // not used
  12. },
  13. /**
  14. * INIT
  15. */
  16. async init() {
  17. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Initializing...`)
  18. switch (this.config.apiVersion) {
  19. case '7.x':
  20. const { Client: Client7 } = require('elasticsearch7')
  21. this.client = new Client7({
  22. nodes: this.config.hosts.split(',').map(_.trim),
  23. sniffOnStart: this.config.sniffOnStart,
  24. sniffInterval: (this.config.sniffInterval > 0) ? this.config.sniffInterval : false,
  25. name: 'wiki-js'
  26. })
  27. break
  28. case '6.x':
  29. const { Client: Client6 } = require('elasticsearch6')
  30. this.client = new Client6({
  31. nodes: this.config.hosts.split(',').map(_.trim),
  32. sniffOnStart: this.config.sniffOnStart,
  33. sniffInterval: (this.config.sniffInterval > 0) ? this.config.sniffInterval : false,
  34. name: 'wiki-js'
  35. })
  36. break
  37. default:
  38. throw new Error('Unsupported version of elasticsearch! Update your settings in the Administration Area.')
  39. }
  40. // -> Create Search Index
  41. await this.createIndex()
  42. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Initialization completed.`)
  43. },
  44. /**
  45. * Create Index
  46. */
  47. async createIndex() {
  48. try {
  49. const indexExists = await this.client.indices.exists({ index: this.config.indexName })
  50. if (!indexExists.body) {
  51. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Creating index...`)
  52. try {
  53. const idxBody = {
  54. properties: {
  55. suggest: { type: 'completion' },
  56. title: { type: 'text', boost: 4.0 },
  57. description: { type: 'text', boost: 3.0 },
  58. content: { type: 'text', boost: 1.0 },
  59. locale: { type: 'keyword' },
  60. path: { type: 'text' }
  61. }
  62. }
  63. await this.client.indices.create({
  64. index: this.config.indexName,
  65. body: {
  66. mappings: (this.config.apiVersion === '6.x') ? {
  67. _doc: idxBody
  68. } : idxBody
  69. }
  70. })
  71. } catch (err) {
  72. WIKI.logger.error(`(SEARCH/ELASTICSEARCH) Create Index Error: `, _.get(err, 'meta.body.error', err))
  73. }
  74. }
  75. } catch (err) {
  76. WIKI.logger.error(`(SEARCH/ELASTICSEARCH) Index Check Error: `, _.get(err, 'meta.body.error', err))
  77. }
  78. },
  79. /**
  80. * QUERY
  81. *
  82. * @param {String} q Query
  83. * @param {Object} opts Additional options
  84. */
  85. async query(q, opts) {
  86. try {
  87. const results = await this.client.search({
  88. index: this.config.indexName,
  89. body: {
  90. query: {
  91. simple_query_string: {
  92. query: q
  93. }
  94. },
  95. from: 0,
  96. size: 50,
  97. _source: ['title', 'description', 'path', 'locale'],
  98. suggest: {
  99. suggestions: {
  100. text: q,
  101. completion: {
  102. field: 'suggest',
  103. size: 5,
  104. skip_duplicates: true,
  105. fuzzy: true
  106. }
  107. }
  108. }
  109. }
  110. })
  111. return {
  112. results: _.get(results, 'body.hits.hits', []).map(r => ({
  113. id: r._id,
  114. locale: r._source.locale,
  115. path: r._source.path,
  116. title: r._source.title,
  117. description: r._source.description
  118. })),
  119. suggestions: _.reject(_.get(results, 'suggest.suggestions', []).map(s => _.get(s, 'options[0].text', false)), s => !s),
  120. totalHits: _.get(results, 'body.hits.total.value', _.get(results, 'body.hits.total', 0))
  121. }
  122. } catch (err) {
  123. WIKI.logger.warn('Search Engine Error: ', _.get(err, 'meta.body.error', err))
  124. }
  125. },
  126. /**
  127. * Build suggest field
  128. */
  129. buildSuggest(page) {
  130. return _.uniq(_.concat(
  131. page.title.split(' ').map(s => ({
  132. input: s,
  133. weight: 4
  134. })),
  135. page.description.split(' ').map(s => ({
  136. input: s,
  137. weight: 3
  138. })),
  139. page.safeContent.split(' ').map(s => ({
  140. input: s,
  141. weight: 1
  142. }))
  143. ))
  144. },
  145. /**
  146. * CREATE
  147. *
  148. * @param {Object} page Page to create
  149. */
  150. async created(page) {
  151. await this.client.index({
  152. index: this.config.indexName,
  153. type: '_doc',
  154. id: page.hash,
  155. body: {
  156. suggest: this.buildSuggest(page),
  157. locale: page.localeCode,
  158. path: page.path,
  159. title: page.title,
  160. description: page.description,
  161. content: page.safeContent
  162. },
  163. refresh: true
  164. })
  165. },
  166. /**
  167. * UPDATE
  168. *
  169. * @param {Object} page Page to update
  170. */
  171. async updated(page) {
  172. await this.client.index({
  173. index: this.config.indexName,
  174. type: '_doc',
  175. id: page.hash,
  176. body: {
  177. suggest: this.buildSuggest(page),
  178. locale: page.localeCode,
  179. path: page.path,
  180. title: page.title,
  181. description: page.description,
  182. content: page.safeContent
  183. },
  184. refresh: true
  185. })
  186. },
  187. /**
  188. * DELETE
  189. *
  190. * @param {Object} page Page to delete
  191. */
  192. async deleted(page) {
  193. await this.client.delete({
  194. index: this.config.indexName,
  195. type: '_doc',
  196. id: page.hash,
  197. refresh: true
  198. })
  199. },
  200. /**
  201. * RENAME
  202. *
  203. * @param {Object} page Page to rename
  204. */
  205. async renamed(page) {
  206. await this.client.delete({
  207. index: this.config.indexName,
  208. type: '_doc',
  209. id: page.sourceHash,
  210. refresh: true
  211. })
  212. await this.client.index({
  213. index: this.config.indexName,
  214. type: '_doc',
  215. id: page.destinationHash,
  216. body: {
  217. suggest: this.buildSuggest(page),
  218. locale: page.localeCode,
  219. path: page.destinationPath,
  220. title: page.title,
  221. description: page.description,
  222. content: page.safeContent
  223. },
  224. refresh: true
  225. })
  226. },
  227. /**
  228. * REBUILD INDEX
  229. */
  230. async rebuild() {
  231. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Rebuilding Index...`)
  232. await this.client.indices.delete({ index: this.config.indexName })
  233. await this.createIndex()
  234. const MAX_INDEXING_BYTES = 10 * Math.pow(2, 20) - Buffer.from('[').byteLength - Buffer.from(']').byteLength // 10 MB
  235. const MAX_INDEXING_COUNT = 1000
  236. const COMMA_BYTES = Buffer.from(',').byteLength
  237. let chunks = []
  238. let bytes = 0
  239. const processDocument = async (cb, doc) => {
  240. try {
  241. if (doc) {
  242. const docBytes = Buffer.from(JSON.stringify(doc)).byteLength
  243. // -> Current batch exceeds size limit, flush
  244. if (docBytes + COMMA_BYTES + bytes >= MAX_INDEXING_BYTES) {
  245. await flushBuffer()
  246. }
  247. if (chunks.length > 0) {
  248. bytes += COMMA_BYTES
  249. }
  250. bytes += docBytes
  251. chunks.push(doc)
  252. // -> Current batch exceeds count limit, flush
  253. if (chunks.length >= MAX_INDEXING_COUNT) {
  254. await flushBuffer()
  255. }
  256. } else {
  257. // -> End of stream, flush
  258. await flushBuffer()
  259. }
  260. cb()
  261. } catch (err) {
  262. cb(err)
  263. }
  264. }
  265. const flushBuffer = async () => {
  266. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Sending batch of ${chunks.length}...`)
  267. try {
  268. await this.client.bulk({
  269. index: this.config.indexName,
  270. body: _.reduce(chunks, (result, doc) => {
  271. result.push({
  272. index: {
  273. _index: this.config.indexName,
  274. _type: '_doc',
  275. _id: doc.id
  276. }
  277. })
  278. doc.safeContent = WIKI.models.pages.cleanHTML(doc.render)
  279. result.push({
  280. suggest: this.buildSuggest(doc),
  281. locale: doc.locale,
  282. path: doc.path,
  283. title: doc.title,
  284. description: doc.description,
  285. content: doc.safeContent
  286. })
  287. return result
  288. }, []),
  289. refresh: true
  290. })
  291. } catch (err) {
  292. WIKI.logger.warn('(SEARCH/ELASTICSEARCH) Failed to send batch to elasticsearch: ', err)
  293. }
  294. chunks.length = 0
  295. bytes = 0
  296. }
  297. await pipeline(
  298. WIKI.models.knex.column({ id: 'hash' }, 'path', { locale: 'localeCode' }, 'title', 'description', 'render').select().from('pages').where({
  299. isPublished: true,
  300. isPrivate: false
  301. }).stream(),
  302. new stream.Transform({
  303. objectMode: true,
  304. transform: async (chunk, enc, cb) => processDocument(cb, chunk),
  305. flush: async (cb) => processDocument(cb)
  306. })
  307. )
  308. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Index rebuilt successfully.`)
  309. }
  310. }