|
|
const _ = require('lodash') const AWS = require('aws-sdk') const stream = require('stream') const Promise = require('bluebird') const pipeline = Promise.promisify(stream.pipeline)
/* global WIKI */
module.exports = { async activate() { // not used
}, async deactivate() { // not used
}, /** * INIT */ async init() { WIKI.logger.info(`(SEARCH/AWS) Initializing...`) this.client = new AWS.CloudSearch({ apiVersion: '2013-01-01', accessKeyId: this.config.accessKeyId, secretAccessKey: this.config.secretAccessKey, region: this.config.region }) this.clientDomain = new AWS.CloudSearchDomain({ apiVersion: '2013-01-01', endpoint: this.config.endpoint, accessKeyId: this.config.accessKeyId, secretAccessKey: this.config.secretAccessKey, region: this.config.region })
let rebuildIndex = false
// -> Define Analysis Schemes
const schemes = await this.client.describeAnalysisSchemes({ DomainName: this.config.domain, AnalysisSchemeNames: ['default_anlscheme'] }).promise() if (_.get(schemes, 'AnalysisSchemes', []).length < 1) { WIKI.logger.info(`(SEARCH/AWS) Defining Analysis Scheme...`) await this.client.defineAnalysisScheme({ DomainName: this.config.domain, AnalysisScheme: { AnalysisSchemeLanguage: this.config.AnalysisSchemeLang, AnalysisSchemeName: 'default_anlscheme' } }).promise() rebuildIndex = true }
// -> Define Index Fields
const fields = await this.client.describeIndexFields({ DomainName: this.config.domain }).promise() if (_.get(fields, 'IndexFields', []).length < 1) { WIKI.logger.info(`(SEARCH/AWS) Defining Index Fields...`) await this.client.defineIndexField({ DomainName: this.config.domain, IndexField: { IndexFieldName: 'id', IndexFieldType: 'literal' } }).promise() await this.client.defineIndexField({ DomainName: this.config.domain, IndexField: { IndexFieldName: 'path', IndexFieldType: 'literal' } }).promise() await this.client.defineIndexField({ DomainName: this.config.domain, IndexField: { IndexFieldName: 'locale', IndexFieldType: 'literal' } }).promise() await this.client.defineIndexField({ DomainName: this.config.domain, IndexField: { IndexFieldName: 'title', IndexFieldType: 'text', TextOptions: { ReturnEnabled: true, AnalysisScheme: 'default_anlscheme' } } }).promise() await this.client.defineIndexField({ DomainName: this.config.domain, IndexField: { IndexFieldName: 'description', IndexFieldType: 'text', TextOptions: { ReturnEnabled: true, AnalysisScheme: 'default_anlscheme' } } }).promise() await this.client.defineIndexField({ DomainName: this.config.domain, IndexField: { IndexFieldName: 'content', IndexFieldType: 'text', TextOptions: { ReturnEnabled: false, AnalysisScheme: 'default_anlscheme' } } }).promise() rebuildIndex = true }
// -> Define suggester
const suggesters = await this.client.describeSuggesters({ DomainName: this.config.domain, SuggesterNames: ['default_suggester'] }).promise() if (_.get(suggesters, 'Suggesters', []).length < 1) { WIKI.logger.info(`(SEARCH/AWS) Defining Suggester...`) await this.client.defineSuggester({ DomainName: this.config.domain, Suggester: { SuggesterName: 'default_suggester', DocumentSuggesterOptions: { SourceField: 'title', FuzzyMatching: 'high' } } }).promise() rebuildIndex = true }
// -> Rebuild Index
if (rebuildIndex) { WIKI.logger.info(`(SEARCH/AWS) Requesting Index Rebuild...`) await this.client.indexDocuments({ DomainName: this.config.domain }).promise() }
WIKI.logger.info(`(SEARCH/AWS) Initialization completed.`) }, /** * QUERY * * @param {String} q Query * @param {Object} opts Additional options */ async query(q, opts) { try { let suggestions = [] const results = await this.clientDomain.search({ query: q, partial: true, size: 50 }).promise() if (results.hits.found < 5) { const suggestResults = await this.clientDomain.suggest({ query: q, suggester: 'default_suggester', size: 5 }).promise() suggestions = suggestResults.suggest.suggestions.map(s => s.suggestion) } return { results: _.map(results.hits.hit, r => ({ id: r.id, path: _.head(r.fields.path), locale: _.head(r.fields.locale), title: _.head(r.fields.title) || '', description: _.head(r.fields.description) || '' })), suggestions: suggestions, totalHits: results.hits.found } } catch (err) { WIKI.logger.warn('Search Engine Error:') WIKI.logger.warn(err) } }, /** * CREATE * * @param {Object} page Page to create */ async created(page) { await this.clientDomain.uploadDocuments({ contentType: 'application/json', documents: JSON.stringify([ { type: 'add', id: page.hash, fields: { locale: page.localeCode, path: page.path, title: page.title, description: page.description, content: page.safeContent } } ]) }).promise() }, /** * UPDATE * * @param {Object} page Page to update */ async updated(page) { await this.clientDomain.uploadDocuments({ contentType: 'application/json', documents: JSON.stringify([ { type: 'add', id: page.hash, fields: { locale: page.localeCode, path: page.path, title: page.title, description: page.description, content: page.safeContent } } ]) }).promise() }, /** * DELETE * * @param {Object} page Page to delete */ async deleted(page) { await this.clientDomain.uploadDocuments({ contentType: 'application/json', documents: JSON.stringify([ { type: 'delete', id: page.hash } ]) }).promise() }, /** * RENAME * * @param {Object} page Page to rename */ async renamed(page) { await this.clientDomain.uploadDocuments({ contentType: 'application/json', documents: JSON.stringify([ { type: 'delete', id: page.sourceHash } ]) }).promise() await this.clientDomain.uploadDocuments({ contentType: 'application/json', documents: JSON.stringify([ { type: 'add', id: page.destinationHash, fields: { locale: page.localeCode, path: page.destinationPath, title: page.title, description: page.description, content: page.safeContent } } ]) }).promise() }, /** * REBUILD INDEX */ async rebuild() { WIKI.logger.info(`(SEARCH/AWS) Rebuilding Index...`)
const MAX_DOCUMENT_BYTES = Math.pow(2, 20) const MAX_INDEXING_BYTES = 5 * Math.pow(2, 20) - Buffer.from('[').byteLength - Buffer.from(']').byteLength const MAX_INDEXING_COUNT = 1000 const COMMA_BYTES = Buffer.from(',').byteLength
let chunks = [] let bytes = 0
const processDocument = async (cb, doc) => { try { if (doc) { const docBytes = Buffer.from(JSON.stringify(doc)).byteLength // -> Document too large
if (docBytes >= MAX_DOCUMENT_BYTES) { throw new Error('Document exceeds maximum size allowed by AWS CloudSearch.') }
// -> Current batch exceeds size hard limit, flush
if (docBytes + COMMA_BYTES + bytes >= MAX_INDEXING_BYTES) { await flushBuffer() }
if (chunks.length > 0) { bytes += COMMA_BYTES } bytes += docBytes chunks.push(doc)
// -> Current batch exceeds count soft limit, flush
if (chunks.length >= MAX_INDEXING_COUNT) { await flushBuffer() } } else { // -> End of stream, flush
await flushBuffer() } cb() } catch (err) { cb(err) } }
const flushBuffer = async () => { WIKI.logger.info(`(SEARCH/AWS) Sending batch of ${chunks.length}...`) try { await this.clientDomain.uploadDocuments({ contentType: 'application/json', documents: JSON.stringify(_.map(chunks, doc => ({ type: 'add', id: doc.id, fields: { locale: doc.locale, path: doc.path, title: doc.title, description: doc.description, content: WIKI.models.pages.cleanHTML(doc.render) } }))) }).promise() } catch (err) { WIKI.logger.warn('(SEARCH/AWS) Failed to send batch to AWS CloudSearch: ', err) } chunks.length = 0 bytes = 0 }
await pipeline( WIKI.models.knex.column({ id: 'hash' }, 'path', { locale: 'localeCode' }, 'title', 'description', 'render').select().from('pages').where({ isPublished: true, isPrivate: false }).stream(), new stream.Transform({ objectMode: true, transform: async (chunk, enc, cb) => processDocument(cb, chunk), flush: async (cb) => processDocument(cb) }) )
WIKI.logger.info(`(SEARCH/AWS) Requesting Index Rebuild...`) await this.client.indexDocuments({ DomainName: this.config.domain }).promise()
WIKI.logger.info(`(SEARCH/AWS) Index rebuilt successfully.`) } }
|