You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

300 lines
7.7 KiB

  1. const _ = require('lodash')
  2. const elasticsearch = require('elasticsearch')
  3. const stream = require('stream')
  4. const Promise = require('bluebird')
  5. const pipeline = Promise.promisify(stream.pipeline)
  6. /* global WIKI */
  7. module.exports = {
  8. async activate() {
  9. // not used
  10. },
  11. async deactivate() {
  12. // not used
  13. },
  14. /**
  15. * INIT
  16. */
  17. async init() {
  18. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Initializing...`)
  19. this.client = new elasticsearch.Client({
  20. apiVersion: this.config.apiVersion,
  21. hosts: this.config.hosts.split(',').map(_.trim),
  22. httpAuth: (this.config.user.length > 0) ? `${this.config.user}:${this.config.pass}` : null,
  23. sniffOnStart: this.config.sniffOnStart,
  24. sniffInterval: (this.config.sniffInterval > 0) ? this.config.sniffInterval : false
  25. })
  26. // -> Create Search Index
  27. await this.createIndex()
  28. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Initialization completed.`)
  29. },
  30. /**
  31. * Create Index
  32. */
  33. async createIndex() {
  34. const indexExists = await this.client.indices.exists({ index: this.config.indexName })
  35. if (!indexExists) {
  36. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Creating index...`)
  37. await this.client.indices.create({
  38. index: this.config.indexName,
  39. body: {
  40. mappings: {
  41. _doc: {
  42. properties: {
  43. suggest: { type: 'completion' },
  44. title: { type: 'text', boost: 4.0 },
  45. description: { type: 'text', boost: 3.0 },
  46. content: { type: 'text', boost: 1.0 },
  47. locale: { type: 'keyword' },
  48. path: { type: 'text' }
  49. }
  50. }
  51. }
  52. }
  53. })
  54. }
  55. },
  56. /**
  57. * QUERY
  58. *
  59. * @param {String} q Query
  60. * @param {Object} opts Additional options
  61. */
  62. async query(q, opts) {
  63. try {
  64. const results = await this.client.search({
  65. index: this.config.indexName,
  66. body: {
  67. query: {
  68. simple_query_string: {
  69. query: q
  70. }
  71. },
  72. from: 0,
  73. size: 50,
  74. _source: ['title', 'description', 'path', 'locale'],
  75. suggest: {
  76. suggestions: {
  77. text: q,
  78. completion: {
  79. field: 'suggest',
  80. size: 5,
  81. skip_duplicates: true,
  82. fuzzy: true
  83. }
  84. }
  85. }
  86. }
  87. })
  88. return {
  89. results: _.get(results, 'hits.hits', []).map(r => ({
  90. id: r._id,
  91. locale: r._source.locale,
  92. path: r._source.path,
  93. title: r._source.title,
  94. description: r._source.description
  95. })),
  96. suggestions: _.reject(_.get(results, 'suggest.suggestions', []).map(s => _.get(s, 'options[0].text', false)), s => !s),
  97. totalHits: results.hits.total
  98. }
  99. } catch (err) {
  100. WIKI.logger.warn('Search Engine Error:')
  101. WIKI.logger.warn(err)
  102. }
  103. },
  104. /**
  105. * Build suggest field
  106. */
  107. buildSuggest(page) {
  108. return _.uniq(_.concat(
  109. page.title.split(' ').map(s => ({
  110. input: s,
  111. weight: 4
  112. })),
  113. page.description.split(' ').map(s => ({
  114. input: s,
  115. weight: 3
  116. })),
  117. page.safeContent.split(' ').map(s => ({
  118. input: s,
  119. weight: 1
  120. }))
  121. ))
  122. },
  123. /**
  124. * CREATE
  125. *
  126. * @param {Object} page Page to create
  127. */
  128. async created(page) {
  129. await this.client.index({
  130. index: this.config.indexName,
  131. type: '_doc',
  132. id: page.hash,
  133. body: {
  134. suggest: this.buildSuggest(page),
  135. locale: page.localeCode,
  136. path: page.path,
  137. title: page.title,
  138. description: page.description,
  139. content: page.safeContent
  140. },
  141. refresh: true
  142. })
  143. },
  144. /**
  145. * UPDATE
  146. *
  147. * @param {Object} page Page to update
  148. */
  149. async updated(page) {
  150. await this.client.index({
  151. index: this.config.indexName,
  152. type: '_doc',
  153. id: page.hash,
  154. body: {
  155. suggest: this.buildSuggest(page),
  156. locale: page.localeCode,
  157. path: page.path,
  158. title: page.title,
  159. description: page.description,
  160. content: page.safeContent
  161. },
  162. refresh: true
  163. })
  164. },
  165. /**
  166. * DELETE
  167. *
  168. * @param {Object} page Page to delete
  169. */
  170. async deleted(page) {
  171. await this.client.delete({
  172. index: this.config.indexName,
  173. type: '_doc',
  174. id: page.hash,
  175. refresh: true
  176. })
  177. },
  178. /**
  179. * RENAME
  180. *
  181. * @param {Object} page Page to rename
  182. */
  183. async renamed(page) {
  184. await this.client.delete({
  185. index: this.config.indexName,
  186. type: '_doc',
  187. id: page.sourceHash,
  188. refresh: true
  189. })
  190. await this.client.index({
  191. index: this.config.indexName,
  192. type: '_doc',
  193. id: page.destinationHash,
  194. body: {
  195. suggest: this.buildSuggest(page),
  196. locale: page.localeCode,
  197. path: page.destinationPath,
  198. title: page.title,
  199. description: page.description,
  200. content: page.safeContent
  201. },
  202. refresh: true
  203. })
  204. },
  205. /**
  206. * REBUILD INDEX
  207. */
  208. async rebuild() {
  209. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Rebuilding Index...`)
  210. await this.client.indices.delete({ index: this.config.indexName })
  211. await this.createIndex()
  212. const MAX_INDEXING_BYTES = 10 * Math.pow(2, 20) - Buffer.from('[').byteLength - Buffer.from(']').byteLength // 10 MB
  213. const MAX_INDEXING_COUNT = 1000
  214. const COMMA_BYTES = Buffer.from(',').byteLength
  215. let chunks = []
  216. let bytes = 0
  217. const processDocument = async (cb, doc) => {
  218. try {
  219. if (doc) {
  220. const docBytes = Buffer.from(JSON.stringify(doc)).byteLength
  221. // -> Current batch exceeds size limit, flush
  222. if (docBytes + COMMA_BYTES + bytes >= MAX_INDEXING_BYTES) {
  223. await flushBuffer()
  224. }
  225. if (chunks.length > 0) {
  226. bytes += COMMA_BYTES
  227. }
  228. bytes += docBytes
  229. chunks.push(doc)
  230. // -> Current batch exceeds count limit, flush
  231. if (chunks.length >= MAX_INDEXING_COUNT) {
  232. await flushBuffer()
  233. }
  234. } else {
  235. // -> End of stream, flush
  236. await flushBuffer()
  237. }
  238. cb()
  239. } catch (err) {
  240. cb(err)
  241. }
  242. }
  243. const flushBuffer = async () => {
  244. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Sending batch of ${chunks.length}...`)
  245. try {
  246. await this.client.bulk({
  247. index: this.config.indexName,
  248. body: _.reduce(chunks, (result, doc) => {
  249. result.push({
  250. index: {
  251. _index: this.config.indexName,
  252. _type: '_doc',
  253. _id: doc.id
  254. }
  255. })
  256. doc.safeContent = WIKI.models.pages.cleanHTML(doc.render)
  257. result.push({
  258. suggest: this.buildSuggest(doc),
  259. locale: doc.locale,
  260. path: doc.path,
  261. title: doc.title,
  262. description: doc.description,
  263. content: doc.safeContent
  264. })
  265. return result
  266. }, []),
  267. refresh: true
  268. })
  269. } catch (err) {
  270. WIKI.logger.warn('(SEARCH/ELASTICSEARCH) Failed to send batch to elasticsearch: ', err)
  271. }
  272. chunks.length = 0
  273. bytes = 0
  274. }
  275. await pipeline(
  276. WIKI.models.knex.column({ id: 'hash' }, 'path', { locale: 'localeCode' }, 'title', 'description', 'render').select().from('pages').where({
  277. isPublished: true,
  278. isPrivate: false
  279. }).stream(),
  280. new stream.Transform({
  281. objectMode: true,
  282. transform: async (chunk, enc, cb) => processDocument(cb, chunk),
  283. flush: async (cb) => processDocument(cb)
  284. })
  285. )
  286. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Index rebuilt successfully.`)
  287. }
  288. }