diff --git a/.gitignore b/.gitignore index 0163acba..2716bcc3 100644 --- a/.gitignore +++ b/.gitignore @@ -107,3 +107,6 @@ sw.* # Storybook .nuxt-storybook storybook-static + +# rclone filter rules +open-panda-dataset-meta-bk__filter.txt diff --git a/package-lock.json b/package-lock.json index 61195e98..3e4642fa 100644 --- a/package-lock.json +++ b/package-lock.json @@ -29253,6 +29253,11 @@ "microevent.ts": "~0.1.1" } }, + "node_modules/workerpool": { + "version": "6.4.0", + "resolved": "https://registry.npmjs.org/workerpool/-/workerpool-6.4.0.tgz", + "integrity": "sha512-i3KR1mQMNwY2wx20ozq2EjISGtQWDIfV56We+yGJ5yDs8jTwQiLLaqHlkBHITlCuJnYlVRmXegxFxZg7gqI++A==" + }, "node_modules/wrap-ansi": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-7.0.0.tgz", @@ -29608,7 +29613,8 @@ "socket.io": "^4.5.2", "socket.io-client": "^4.5.2", "uuid": "^8.3.1", - "uuid-apikey": "^1.5.1" + "uuid-apikey": "^1.5.1", + "workerpool": "^6.4.0" }, "devDependencies": { "eslint": "^8.23.0", @@ -37466,7 +37472,8 @@ "socket.io": "^4.5.2", "socket.io-client": "^4.5.2", "uuid": "^8.3.1", - "uuid-apikey": "^1.5.1" + "uuid-apikey": "^1.5.1", + "workerpool": "*" } }, "better-opn": { @@ -52182,6 +52189,11 @@ "microevent.ts": "~0.1.1" } }, + "workerpool": { + "version": "6.4.0", + "resolved": "https://registry.npmjs.org/workerpool/-/workerpool-6.4.0.tgz", + "integrity": "sha512-i3KR1mQMNwY2wx20ozq2EjISGtQWDIfV56We+yGJ5yDs8jTwQiLLaqHlkBHITlCuJnYlVRmXegxFxZg7gqI++A==" + }, "wrap-ansi": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-7.0.0.tgz", diff --git a/packages/be/crons/cid-batch-import.js b/packages/be/crons/cid-batch-import.js new file mode 100644 index 00000000..8fc989a8 --- /dev/null +++ b/packages/be/crons/cid-batch-import.js @@ -0,0 +1,261 @@ +// ///////////////////////////////////////////////////// Imports + general setup +// ----------------------------------------------------------------------------- +const ModuleAlias = require('module-alias') +const Path = require('path') +const Axios = require('axios') +const Fs = require('fs-extra') +const Express = require('express') +const Util = require('util') +const Stream = require('stream') +const Pipeline = Util.promisify(Stream.pipeline) +const Spawn = require('child_process').spawn +const Mongoose = require('mongoose') +const MongooseSlugUpdater = require('mongoose-slug-updater') + +require('dotenv').config({ path: Path.resolve(__dirname, '../.env') }) + +const MC = require('../config') + +const CID_TMP_DIR = Path.resolve(`${MC.packageRoot}/tmp/cid-files`) + +// ////////////////////////////////////////////////////////////////// Initialize +MC.app = Express() +Mongoose.plugin(MongooseSlugUpdater) + +// ///////////////////////////////////////////////////////////////////// Aliases +ModuleAlias.addAliases({ + '@Root': MC.packageRoot, + '@Modules': `${MC.packageRoot}/modules` +}) + +try { + const modulesRoot = `${MC.packageRoot}/modules` + const items = Fs.readdirSync(modulesRoot) + items.forEach((name) => { + const path = `${modulesRoot}/${name}` + if (Fs.statSync(path).isDirectory()) { + const moduleName = `${name[0].toUpperCase() + name.substring(1)}` + ModuleAlias.addAlias(`@Module_${moduleName}`, path) + } + }) +} catch (e) { + console.log(e) +} + +// ///////////////////////////////////////////////////////////////////// Modules +require('@Module_Database') +require('@Module_Cidtemp') +const { GetFileFromDisk } = require('@Module_Utilities') +const { InitializeWorker } = require('@Root/scripts/worker-pool-batch-processor.js') + +// /////////////////////////////////////////////////////////////////// Functions +// ------------------------------------------------------------ retrieveCidFiles +const retrieveCidFile = async (line, batchNo, retryNo = 0) => { + try { + if (retryNo > 0) { + console.log(`Retry number ${retryNo}`) + } + const upload = JSON.parse(line) + // fetch file using upload cid + const response = await Axios.get(`https://${upload.cid}.ipfs.w3s.link/`, { + responseType: 'stream' + }) + // if a file already exists with this name in the temp folder, + // delete it to make way for an updated version + await deleteTemporaryFile(`batch_${batchNo}/${upload.name}`) + // write file data to new zst file in the temp folder + await Pipeline(response.data, Fs.createWriteStream(`${CID_TMP_DIR}/batch_${batchNo}/${upload.name}`)) + // unpack the zst and return the inner json + return await unpackRetrievedFile({ + cid: upload.cid, + name: upload.name, + updated: upload.updated, + created: upload.created + }, batchNo) + } catch (e) { + if (retryNo < 10) { + console.log(`Error retrieving CID ${JSON.parse(line).cid}. Retrying retrieval...`) + await retrieveCidFile(line, batchNo, retryNo + 1) + } else { + const cid = JSON.parse(line).cid + await cacheFailedCID(cid) + console.log('==================================== [Function: unpackCids]') + console.log(`Could not retrieve CID ${cid}. Max retries reached.`) + console.log(e) + } + } +} + +// --------------------------------------------------------------- unpackZstFile +const unpackZstFile = (file, batchNo) => { + return new Promise((resolve, reject) => { + const unzstd = Spawn('unzstd', [`../tmp/cid-files/batch_${batchNo}/${file.name}`]) + const errors = [] + unzstd.stderr.on('data', (msg) => { + errors.push(`Error unpacking ${file.name}: ${msg.toString()}`) + }) + unzstd.on('exit', (code) => { + const err = errors.length > 0 && code !== 0 + if (err) { + console.log(errors.join('\n')) + } + resolve() + }) + }) +} + +// --------------------------------------------------------- unpackRetrievedFile +const unpackRetrievedFile = async (file, batchNo) => { + try { + const jsonFilename = file.name.substring(0, file.name.length - 4) + await unpackZstFile(file, batchNo) + const json = await GetFileFromDisk(`${CID_TMP_DIR}/batch_${batchNo}/${jsonFilename}`, true) + const fileMetadata = { + piece_cid: json.piece_cid, + payload_cid: json.payload_cid, + raw_car_file_size: json.raw_car_file_size, + dataset_slug: json.dataset, + filename: jsonFilename, + web3storageCreatedAt: file.created, + web3storageUpdatedAt: file.updated + } + await deleteTemporaryFile(`batch_${batchNo}/${jsonFilename}`) + return fileMetadata + } catch (e) { + console.log('============================ [Function: unpackRetrievedFiles]') + console.log(e) + } +} + +// -------------------------------------------------------------- cacheFailedCid +const cacheFailedCID = async (cid) => { + try { + await Pipeline(`${cid}\n`, Fs.createWriteStream(`${CID_TMP_DIR}/failed-cid-retrievals.txt`, { flags: 'a' })) + } catch (e) { + console.log('================================= [Function: cacheFailedCID ]') + console.log(e) + } +} + +// --------------------------------------------------------- deleteTemporaryFile +const deleteTemporaryFile = async (path) => { + try { + if (Fs.existsSync(`${CID_TMP_DIR}/${path}`)) { + Fs.unlinkSync(`${CID_TMP_DIR}/${path}`) + } + } catch (e) { + console.log('============================ [Function: deleteTemporaryFile ]') + console.log(e) + } +} + +// ------------------------------------------------ writeBatchMetadataToDatabase +const writeBatchMetadataToDatabase = async (retrievedFiles) => { + try { + const operations = [] + const len = retrievedFiles.length + for (let i = 0; i < len; i++) { + const file = retrievedFiles[i] + operations.push({ + updateOne: { + filter: { payload_cid: file.payload_cid }, + update: { $set: file }, + upsert: true + } + }) + } + const response = await MC.model.Cidtemp.bulkWrite(operations, { ordered: false }) + return response.result + } catch (e) { + console.log('==================== [Function: writeBatchMetadataToDatabase]') + console.log(e) + } +} + +// ------------------------------------------------- backupCidsToBackblazeBucket +const backupCidsToBackblazeBucket = async (batchNo) => { + try { + const rclone = Spawn('rclone', [ + 'copy', + `${MC.tmpRoot}/cid-files/batch_${batchNo}`, + process.env.B2_OPENPANDA_BUCKET, + '--filter-from', + process.env.B2_OPENPANDA_FILTER + ]) + const errors = [] + for await (const msg of rclone.stderr) { + errors.push(msg.toString()) + } + return await new Promise((resolve, reject) => { + rclone.on('exit', (code) => { + const err = errors.length > 0 && code !== 0 + err ? reject({ + success: false, + message: errors.join('\n\n') + }) : resolve({ + success: true, + message: `✓ CID batch ${batchNo} backup successful` + }) + }) + }) + } catch (e) { + console.log('===================== [Function: backupCidsToBackblazeBucket]') + console.log(e) + } +} + +// -------------------------------------------------------- processManifestBatch +const processManifestBatch = async (batch, batchNo) => { + try { + // make a temporary subdirectory for this batch + if (!Fs.existsSync(`${CID_TMP_DIR}/batch_${batchNo}`)) { + Fs.mkdirSync(`${CID_TMP_DIR}/batch_${batchNo}`) + } + // individually download each CID file in the batch + // save zst to a temp/cid-files/batch_x folder, extract and return metadata + // to the retrieved array + const len = batch.length + const retrievedFiles = [] + for (let i = 0; i < len; i++) { + const cidManifestItem = batch[i] + const retrieved = await retrieveCidFile(cidManifestItem, batchNo) + if (retrieved) { retrievedFiles.push(retrieved) } + } + if (!retrievedFiles.length) { + throw new Error('No CIDs could be retrieved from this batch') + } + // save batch metadata to the database + const databaseWriteResult = await writeBatchMetadataToDatabase(retrievedFiles) + // backup zst files in the corresponding temp/cid-files/batch_x folder to backblaze + const batchBackupResult = await backupCidsToBackblazeBucket(batchNo) + // if the backup is successful clean up temp folder by deleting batch + if (batchBackupResult && batchBackupResult.success) { + if (Fs.existsSync(`${CID_TMP_DIR}/batch_${batchNo}`)) { + Fs.rm(`${CID_TMP_DIR}/batch_${batchNo}`, { recursive: true, force: true }) + } + } + let result = undefined + if (databaseWriteResult && batchBackupResult) { + result = { + batchNo: batchNo, + databaseWriteResult: databaseWriteResult, + batchBackupResult: batchBackupResult + } + } + // return results to the main thread + return await new Promise((resolve, reject) => { + if (result) { + resolve(result) + } else { + reject() + } + }) + } catch (e) { + console.log('============================ [Function: processManifestBatch]') + console.log(e) + } +} + +// /////////////////////////////////////////////////////////// Initialize Worker +// ----------------------------------------------------------------------------- +InitializeWorker(processManifestBatch) diff --git a/packages/be/crons/cid-importer.js b/packages/be/crons/cid-importer.js index a9f9e8c5..bd655e3b 100644 --- a/packages/be/crons/cid-importer.js +++ b/packages/be/crons/cid-importer.js @@ -17,14 +17,16 @@ const Util = require('util') const Stream = require('stream') const Pipeline = Util.promisify(Stream.pipeline) const readline = require('node:readline') -const Spawn = require('child_process').spawn const argv = require('minimist')(process.argv.slice(2)) + require('dotenv').config({ path: Path.resolve(__dirname, '../.env') }) const MC = require('../config') const CID_TMP_DIR = Path.resolve(`${MC.packageRoot}/tmp/cid-files`) +let manifestLength = 0 + // ////////////////////////////////////////////////////////////////// Initialize MC.app = Express() @@ -50,176 +52,74 @@ try { // ///////////////////////////////////////////////////////////////////// Modules require('@Module_Database') -require('@Module_Cid') -const { GetFileFromDisk, SecondsToHms } = require('@Module_Utilities') +require('@Module_Cidtemp') +const { SecondsToHms } = require('@Module_Utilities') +const { CreateWorkerPool } = require('@Root/scripts/worker-pool-batch-processor.js') // /////////////////////////////////////////////////////////////////// Functions -// ------------------------------------------------------------ retrieveCidFiles -const retrieveCidFile = async (line, retryNo = 0) => { - try { - if (retryNo > 0) { - console.log(`Retry number ${retryNo}`) - } - const upload = JSON.parse(line) - // fetch file using upload cid - const response = await Axios.get(`https://${upload.cid}.ipfs.w3s.link/`, { - responseType: 'stream' - }) - // if a file already exists with this name in the temp folder, - // delete it to make way for an updated version - await deleteTemporaryFile(upload.name) - // write file data to new zst file in the temp folder - await Pipeline(response.data, Fs.createWriteStream(`${CID_TMP_DIR}/${upload.name}`)) - // unpack the zst and return the inner json - return await unpackRetrievedFile({ - cid: upload.cid, - name: upload.name, - updated: upload.updated, - created: upload.created - }) - } catch (e) { - console.log('====================================== [Function: unpackCids]') - console.log(e) - if (retryNo < 10) { - console.log(`Error retrieving CID ${JSON.parse(line).cid}. Retrying retrieval...`) - await retrieveCidFile(line, retryNo + 1) - } else { - const cid = JSON.parse(line).cid - console.log(`Could not retrieve CID ${cid}. Max retries reached.`) - await cacheFailedCID(cid) +// ------------------------------------------------------ logCurrentImportTotals +const logCurrentImportTotals = (currentResult, batchNo, resultsToDate) => { + const len = resultsToDate.length + let dbTotalImported = 0 + let dbTotalModified = 0 + for (let i = 0; i < len; i++) { + const result = resultsToDate[i] + if (result && result.databaseWriteResult) { + const newlyImported = result.databaseWriteResult.nUpserted || 0 + const newlyModified = result.databaseWriteResult.nModified || 0 + dbTotalImported = dbTotalImported + newlyImported + dbTotalModified = dbTotalModified + newlyModified } } -} - -// --------------------------------------------------------------- unpackZstFile -const unpackZstFile = (file) => { - return new Promise((resolve, reject) => { - const unzstd = Spawn('unzstd', [`../tmp/cid-files/${file.name}`]) - const errors = [] - unzstd.stderr.on('data', (msg) => { - errors.push(`Error unpacking ${file.name}: ${msg.toString()}`) - }) - unzstd.on('exit', (code) => { - const err = errors.length > 0 && code !== 0 - if (err) { - console.log(errors.join('\n')) - } - resolve() - }) - }) -} - -// --------------------------------------------------------- unpackRetrievedFile -const unpackRetrievedFile = async (file) => { - try { - const jsonFilename = file.name.substring(0, file.name.length - 4) - await deleteTemporaryFile(jsonFilename) - await unpackZstFile(file) - await deleteTemporaryFile(file.name) - const json = await GetFileFromDisk(`${CID_TMP_DIR}/${jsonFilename}`, true) - const fileMetadata = { - piece_cid: json.piece_cid, - payload_cid: json.payload_cid, - raw_car_file_size: json.raw_car_file_size, - dataset_slug: json.dataset, - filename: jsonFilename, - web3storageCreatedAt: file.created, - web3storageUpdatedAt: file.updated - } - await deleteTemporaryFile(jsonFilename) - return fileMetadata - } catch (e) { - console.log('============================ [Function: unpackRetrievedFiles]') - console.log(e) + if ( + typeof currentResult === 'object' && + typeof currentResult.databaseWriteResult === 'object' && + typeof currentResult.batchBackupResult === 'object' + ) { + const currentImportStats = `${currentResult.databaseWriteResult.nUpserted + currentResult.databaseWriteResult.nModified} CIDs were imported to the db in this batch` + console.log(` ${currentImportStats} | ${currentResult.batchBackupResult.message} | ${dbTotalImported + dbTotalModified} of ${manifestLength} CIDs completed`) } } -// ------------------------------------------------- writeFileMetadataToDatabase -const writeFileMetadataToDatabase = async (retrievedFiles) => { - try { - const operations = [] - const len = retrievedFiles.length - for (let i = 0; i < len; i++) { - const file = retrievedFiles[i] - operations.push({ - updateOne: { - filter: { payload_cid: file.payload_cid }, - update: { $set: file }, - upsert: true - } - }) - } - const response = await MC.model.Cid.bulkWrite(operations, { ordered: false }) - return response.result - } catch (e) { - console.log('========================= [Function: writeCidFilesToDatabase]') - console.log(e) - } -} - -// -------------------------------------------------------------- cacheFailedCid -const cacheFailedCID = async (cid) => { - try { - await Pipeline(`${cid}\n`, Fs.createWriteStream(`${CID_TMP_DIR}/failed-cid-retrievals.txt`, { flags: 'a' })) - } catch (e) { - console.log('================================= [Function: cacheFailedCID ]') - console.log(e) +const logFinalImportResults = (results, errors) => { + console.log(`📒 CID import & backup finished | ${results.length} total batches processed`) + const len = errors.length + failedBatches = [] + for (let i = 0; i < len; i++) { + const error = errors[i] + failedBatches.push(error.batch) } -} - -// --------------------------------------------------------- deleteTemporaryFile -const deleteTemporaryFile = async (filename) => { - try { - if (Fs.existsSync(`${CID_TMP_DIR}/${filename}`)) { - Fs.unlinkSync(`${CID_TMP_DIR}/${filename}`) - } - } catch (e) { - console.log('============================ [Function: deleteTemporaryFile ]') - console.log(e) + const failedCids = failedBatches.flat() + if (failedCids.length) { + console.log(`${failedCids.length} CID imports/backups were unsuccessful:`) + console.log(failedCids) } } // ------------------------------------------------- getCidFilesFromManifestList -const getCidFilesFromManifestList = async (importMax) => { +const getCidFilesFromManifestList = async () => { try { - if (Fs.existsSync(`${CID_TMP_DIR}/cid-manifest.txt`)) { - const manifest = Fs.createReadStream(`${CID_TMP_DIR}/cid-manifest.txt`) - const rl = readline.createInterface({ - input: manifest, - crlfDelay: Infinity - }) - // import all lines from the manifest to an array - const manifestCidLines = [] - for await (const line of rl) { - manifestCidLines.push(line) - } - // reverse the array to import oldest CIDs first - // begin writing to the database in batches - manifestCidLines.reverse() - let retrievedFiles = [] - let total = 0 - const batchSize = argv.pagesize || 1000 - const len = manifestCidLines.length - for (let i = 0; i < len; i++) { - if (i < importMax) { - const line = manifestCidLines[i] - console.log(`Retrieving file ${i + 1} from the CID manifest list.`) - const retrieved = await retrieveCidFile(line) - if (retrieved) { retrievedFiles.push(retrieved) } - // write retrieved file data to the database in batches of 1000 - if ((i + 1) % batchSize === 0) { - const result = await writeFileMetadataToDatabase(retrievedFiles) - total = total + result.nUpserted + result.nModified - console.log(`${result.nUpserted} new CIDs imported in this batch | ${result.nModified} CIDs updated in this batch | A total of ${total} CIDs imported/updated so far.`) - retrievedFiles = [] - } - } else { - break - } - } - const result = await writeFileMetadataToDatabase(retrievedFiles) - console.log(`${result.nUpserted} new CIDs imported in this batch | ${result.nModified} CIDs updated in this batch | A total of ${total + result.nUpserted + result.nModified} CIDs were imported/updated to the database.`) + const manifestList = Fs.createReadStream(`${CID_TMP_DIR}/cid-manifest.txt`) + const rl = readline.createInterface({ + input: manifestList, + crlfDelay: Infinity + }) + // import all lines from the manifest to an array + const manifest = [] + for await (const line of rl) { + manifest.push(line) } + // reverse the array to import oldest CIDs first + manifest.reverse() + manifestLength = manifest.length + // pass manifest on to the worker pool + const options = { + threads: argv.threads, + batchSize: argv.pagesize || 1000, + onBatchResult: logCurrentImportTotals, + onWorkerPoolComplete: logFinalImportResults + } + CreateWorkerPool(Path.resolve(__dirname, './cid-batch-import.js'), 'processManifestBatch', manifest, options) } catch (e) { console.log('===================== [Function: getCidFilesFromManifestList]') console.log(e) @@ -276,18 +176,24 @@ const createManifestFromWeb3StorageCids = async (searchParams, maxPages, lastSav // ///////////////////////////////////////////////////////////////// CidImporter const CidImporter = async () => { try { - const start = process.hrtime()[0] + startTime = process.hrtime()[0] const limit = argv.pagesize || 1000 const maxPages = argv.maxpages || Infinity console.log(`📖 CID import started | page size of ${limit} and page maximum ${maxPages}.`) // Get the latest upload entry from the database - const mostRecentCid = await MC.model.Cid.find().sort({ web3storageCreatedAt: -1 }).limit(1) - const mostRecentDocument = mostRecentCid[0] - console.log('Most recent CID imported:') - console.log(mostRecentDocument) + const mostRecentCid = await MC.model.Cidtemp.find().sort({ web3storageCreatedAt: -1 }).limit(1) + let mostRecentDocument = mostRecentCid[0] + if (argv.all) { + mostRecentDocument = false + } else { + console.log('Most recent CID imported:') + console.log(mostRecentDocument) + } const lastSavedDate = mostRecentDocument ? new Date(mostRecentDocument.web3storageCreatedAt).getTime() : 0 // Delete the outdated manifest file if it exists - await deleteTemporaryFile('cid-manifest.txt') + if (Fs.existsSync(`${CID_TMP_DIR}/cid-manifest.txt`)) { + Fs.unlinkSync(`${CID_TMP_DIR}/cid-manifest.txt`) + } /** * Build a manifest list of all cids not yet uploaded to the database: * args: @@ -306,10 +212,9 @@ const CidImporter = async () => { * args: * limit number of entries to the database (this will only be used for test) */ - await getCidFilesFromManifestList(limit * maxPages) - const end = process.hrtime()[0] - console.log(`📒 CID import finished | took ${SecondsToHms(end - start)}`) - process.exit(0) + await getCidFilesFromManifestList() + const endTime = process.hrtime()[0] + console.log(`CID manifest took ${SecondsToHms(endTime - startTime)}`) } catch (e) { console.log('===================================== [Function: CidImporter]') console.log(e) diff --git a/packages/be/modules/cidtemp/index.js b/packages/be/modules/cidtemp/index.js new file mode 100644 index 00000000..cb2ea84e --- /dev/null +++ b/packages/be/modules/cidtemp/index.js @@ -0,0 +1,16 @@ +console.log('📦 [module] cid temp') + +// ///////////////////////////////////////////////////////////////////// Imports +// ----------------------------------------------------------------------------- +const { RunStartupChecks } = require('@Module_Utilities') + +const MC = require('@Root/config') + +// ////////////////////////////////////////////////////////////// Startup Checks +// ----------------------------------------------------------------------------- +const checks = [] +RunStartupChecks(checks) + +// //////////////////////////////////////////////////////////////// Import Model +// ----------------------------------------------------------------------------- +MC.model.Cidtemp = require('@Module_Cidtemp/model') diff --git a/packages/be/modules/cidtemp/model/index.js b/packages/be/modules/cidtemp/model/index.js new file mode 100644 index 00000000..745e13b5 --- /dev/null +++ b/packages/be/modules/cidtemp/model/index.js @@ -0,0 +1,46 @@ +console.log('💿 [model] op_cids_temp') + +// ///////////////////////////////////////////////////////////////////// Imports +// ----------------------------------------------------------------------------- +const Mongoose = require('mongoose') +const Schema = Mongoose.Schema + +// ////////////////////////////////////////////////////////////////////// Schema +// ----------------------------------------------------------------------------- +const CIDtempSchema = new Schema({ + piece_cid: { + type: String, + required: true + }, + payload_cid: { + type: String, + required: true, + index: true + }, + raw_car_file_size: { + type: Number, + required: true + }, + dataset_slug: { + type: String, + required: true + }, + web3storageCreatedAt: { + type: Date, + required: true + }, + web3storageUpdatedAt: { + type: Date, + required: true + }, + filename: { + type: String, + required: true + } +}, { + timestamps: false +}) + +// ////////////////////////////////////////////////////////////////////// Export +// ----------------------------------------------------------------------------- +module.exports = Mongoose.model('op_cids_temp', CIDtempSchema) diff --git a/packages/be/package.json b/packages/be/package.json index 6f4c4edf..698305dc 100644 --- a/packages/be/package.json +++ b/packages/be/package.json @@ -52,6 +52,7 @@ "socket.io": "^4.5.2", "socket.io-client": "^4.5.2", "uuid": "^8.3.1", - "uuid-apikey": "^1.5.1" + "uuid-apikey": "^1.5.1", + "workerpool": "^6.4.0" } } diff --git a/packages/be/scripts/worker-pool-batch-processor.js b/packages/be/scripts/worker-pool-batch-processor.js new file mode 100644 index 00000000..2655d9d3 --- /dev/null +++ b/packages/be/scripts/worker-pool-batch-processor.js @@ -0,0 +1,113 @@ +// ///////////////////////////////////////////////////// Imports + general setup +// ----------------------------------------------------------------------------- +const WorkerPool = require('workerpool') +let startTime + +// /////////////////////////////////////////////////////////////////// Functions +// ------------------------------------ Convert Seconds To Hours|Minutes|Seconds +const SecondsToHms = (seconds) => { + const input = Number(seconds) + const h = Math.floor(input / 3600) + const m = Math.floor(input % 3600 / 60) + const s = Math.floor(input % 3600 % 60) + const hDisplay = h > 0 ? h + (h === 1 ? ' hour, ' : ' hours, ') : '' + const mDisplay = m > 0 ? m + (m === 1 ? ' minute, ' : ' minutes, ') : '' + const sDisplay = s > 0 ? s + (s === 1 ? ' second' : ' seconds') : '' + return hDisplay + mDisplay + sDisplay +} + +// ---------------------------------------------------------------- LogPoolStats +const logPoolStatus = (Pool, num) => { + const activeTasks = `${Pool.stats().activeTasks} batches currently being processed` + const pendingTasks = `${Pool.stats().pendingTasks} pending batches remaining` + console.log(`Batch ${num} finished | ${activeTasks} | ${pendingTasks}`) +} + +// ----------------------------------------------------- ScheduleBatchOperations +const scheduleBatchOperation = ( + Pool, + manifest, + operation, + options = {}, + batchNo = 1, + tasks = [], + results = [], + errors = [] +) => { + try { + if (batchNo === 1) { + startTime = process.hrtime()[0] + console.log('🤖 Worker pool started') + } + const batchSize = options.batchSize || 10 + const batch = manifest.slice(0, batchSize) + const num = batchNo + tasks.push( + Pool.exec(operation, [batch, num]).then(async (result) => { + if (!result) { + throw new Error(`An issue occured with batch ${num}`) + } + results.push(result) + logPoolStatus(Pool, num) + if (typeof options.onBatchResult === 'function') { + options.onBatchResult(result, num, results) + } + }).catch((e) => { + errors.push({ num, batch, error: e }) + console.log(`Error returned by worker: Could not process batch ${num}.`) + console.error(e) + }) + ) + // remove batch from the manifest list + manifest.splice(0, batchSize) + // If there are items remaining in the manifest list, schedule a next batch + // Otherwise proceed with compiling final results + if (manifest.length) { + scheduleBatchOperation(Pool, manifest, operation, options, ++batchNo, tasks, results, errors) + } else { + Promise.all(tasks).catch((e) => { + console.log('=========================== Error returned by worker pool') + console.log(e) + process.exit(0) + }).then(async () => { + const endTime = process.hrtime()[0] + console.log(`✅ Worker pool task list complete | took ${SecondsToHms(endTime - startTime)}`) + if (typeof options.onWorkerPoolComplete === 'function') { + options.onWorkerPoolComplete(results, errors) + } + Pool.terminate() + process.exit(0) + }) + } + } catch (e) { + console.log('======================= [Function: performCidBatchOperations]') + console.log(e) + } +} + +// ------------------------------------------------------------ CreateWorkerPool +const CreateWorkerPool = async (pathToScript, operation, manifest, options) => { + try { + const Pool = WorkerPool.pool(pathToScript, { + maxWorkers: options.threads || 15, + workerType: 'thread' + }) + scheduleBatchOperation(Pool, manifest, operation, options) + } catch (e) { + console.log('=========================== [Function: Initialize Worker Pool') + console.log(e) + } +} + +// ------------------------------------------------------------ InitializeWorker +const InitializeWorker = (operation) => { + const initOperation = {} + initOperation[operation.name] = operation + WorkerPool.worker(initOperation) +} + +// --------------------------------------------------------------------- Exports +module.exports = { + CreateWorkerPool, + InitializeWorker +} diff --git a/packages/be/scripts/worker-pool-batch-processor.md b/packages/be/scripts/worker-pool-batch-processor.md new file mode 100644 index 00000000..9eaa0b7b --- /dev/null +++ b/packages/be/scripts/worker-pool-batch-processor.md @@ -0,0 +1,37 @@ +## Multithread Batch Processing via Workerpool + +### Overview + +The `worker-pool-batch-processor.js` module manages a pool of node workers to process large lists of data or objects in batches. It is a simple layer built on top of the [workerpool npm package](https://www.npmjs.com/package/workerpool) that specifically makes processing large datasets in batches quick and straightforward. The worker-pool-batch-processor script exports two functions; `CreateWorkerPool` and `InitializeWorker`. + +`CreateWorkerPool` instantiates a pool of workers using the workerpool npm package. It then proceeds to siphon and queue batches from a manifest list one at a time. The manifest list can either be a list of data, objects or references to files. How this list is processed or used for processing depends entirely on the script supplied to `CreateWorkerPool`, the contents of which are passed on to each individual worker. The `CreateWorkerPool` function is agnostic to the contents of the script and only handles delegating batches to workers and managing results returned from each worker. + +There are four arguments that the function will accept: + +- **pathToScript**: (String) an absolute path to a script which each worker will run +- **operation**: (String) the name of the root function in the script that the worker will interface with +- **manifest**: (Array) an array of data to process or use within the script +- **options**: (Object) an object with the following options: + - _threads_: (Number) the number of workers to run simultaneously + - _batchSize_: (Number) how big each batch of data should be that is processed by each worker + - _onBatchResult_: (Function) a function called that when a worker has finished a batch and returns the results of the batch processing. It is passed three arguments by the worker: + - 1. result: the result returned by the worker + - 2. num: the batch number + - 3. results: all results to date including this result as the most recent + These arguments would typically be used to display the worker pool's progress as each batch is completed. + - _onWorkerPoolComplete_: (Function) a function that is called on completion of processing the entire manifest list. i.e. all workers and queued tasks have completed. This function is passed two arguments: + - 1. results: (Array) an array of the results from each individual batch + - 2. errors: (Array) an array of any errors returned by a worker while processing a batch + +`InitializeWorker` is a simple initialization function that takes a single argument: +- **operation**: (Function) a function that each worker will perform on each batch it receives. The name of this function must match the operation string described above. + + +### Use + +The module must be used with two separate scripts: + +One runs in the main thread and is the source of the manifest list passed to the worker pool. In this thread/script the `CreateWorkerPool` function must be imported from the `worker-pool-batch-processor.js` module and called at the appropriate step with the arguments described above. + +Likewise, the `InitializeWorker` function must be imported in a script representing all processes to be executed in a worker thread. This function must be called as the last step in the script and must be passed the top-most function in the script whose name exactly matches the operation string passed to the `CreateWorkerPool` function. +This function (the operation passed as an argument to `InitializeWorker`) must return a Promise which either resolves with the result of a processed batch or rejects upon error or null result.