From 46458856ec6c093ea212c8dda706d147b3965dca Mon Sep 17 00:00:00 2001 From: svvimming Date: Wed, 3 May 2023 11:56:16 -0400 Subject: [PATCH 1/8] feat: cid-importer cron split to multithread processing using worker pool --- packages/be/crons/cid-batch-import.js | 226 ++++++++++++++++++ packages/be/crons/cid-importer.js | 281 ++++++++-------------- packages/be/crons/x_cid-importer.js | 322 ++++++++++++++++++++++++++ 3 files changed, 641 insertions(+), 188 deletions(-) create mode 100644 packages/be/crons/cid-batch-import.js create mode 100644 packages/be/crons/x_cid-importer.js diff --git a/packages/be/crons/cid-batch-import.js b/packages/be/crons/cid-batch-import.js new file mode 100644 index 00000000..fa4fbebf --- /dev/null +++ b/packages/be/crons/cid-batch-import.js @@ -0,0 +1,226 @@ +// ///////////////////////////////////////////////////// Imports + general setup +// ----------------------------------------------------------------------------- +const ModuleAlias = require('module-alias') +const Path = require('path') +const Axios = require('axios') +const Fs = require('fs-extra') +const Express = require('express') +const Util = require('util') +const Stream = require('stream') +const Pipeline = Util.promisify(Stream.pipeline) +const Spawn = require('child_process').spawn +const WorkerPool = require('workerpool') +const Mongoose = require('mongoose') +const MongooseSlugUpdater = require('mongoose-slug-updater') + +require('dotenv').config({ path: Path.resolve(__dirname, '../.env') }) + +const MC = require('../config') + +const CID_TMP_DIR = Path.resolve(`${MC.packageRoot}/tmp/cid-files`) + +// ////////////////////////////////////////////////////////////////// Initialize +MC.app = Express() +Mongoose.plugin(MongooseSlugUpdater) + +// ///////////////////////////////////////////////////////////////////// Aliases +ModuleAlias.addAliases({ + '@Root': MC.packageRoot, + '@Modules': `${MC.packageRoot}/modules` +}) + +try { + const modulesRoot = `${MC.packageRoot}/modules` + const items = Fs.readdirSync(modulesRoot) + items.forEach((name) => { + const path = `${modulesRoot}/${name}` + if (Fs.statSync(path).isDirectory()) { + const moduleName = `${name[0].toUpperCase() + name.substring(1)}` + ModuleAlias.addAlias(`@Module_${moduleName}`, path) + } + }) +} catch (e) { + console.log(e) +} + +// ///////////////////////////////////////////////////////////////////// Modules +require('@Module_Database') +require('@Module_Cidtemp') + +// /////////////////////////////////////////////////////////////////// Functions +// ------------------------------------------------------------ retrieveCidFiles +const retrieveCidFile = async (line, retryNo = 0) => { + try { + if (retryNo > 0) { + console.log(`Retry number ${retryNo}`) + } + const upload = JSON.parse(line) + // fetch file using upload cid + const response = await Axios.get(`https://${upload.cid}.ipfs.w3s.link/`, { + responseType: 'stream' + }) + // if a file already exists with this name in the temp folder, + // delete it to make way for an updated version + await deleteTemporaryFile(CID_TMP_DIR, upload.name) + // write file data to new zst file in the temp folder + await Pipeline(response.data, Fs.createWriteStream(`${CID_TMP_DIR}/${upload.name}`)) + // unpack the zst and return the inner json + return await unpackRetrievedFile(CID_TMP_DIR, { + cid: upload.cid, + name: upload.name, + updated: upload.updated, + created: upload.created + }) + } catch (e) { + console.log('====================================== [Function: unpackCids]') + console.log(e) + if (retryNo < 10) { + console.log(`Error retrieving CID ${JSON.parse(line).cid}. Retrying retrieval...`) + await retrieveCidFile(CID_TMP_DIR, line, retryNo + 1) + } else { + const cid = JSON.parse(line).cid + console.log(`Could not retrieve CID ${cid}. Max retries reached.`) + await cacheFailedCID(CID_TMP_DIR, cid) + } + } +} + +// --------------------------------------------------------------- unpackZstFile +const unpackZstFile = (file) => { + return new Promise((resolve, reject) => { + const unzstd = Spawn('unzstd', [`../tmp/cid-files/${file.name}`]) + const errors = [] + unzstd.stderr.on('data', (msg) => { + errors.push(`Error unpacking ${file.name}: ${msg.toString()}`) + }) + unzstd.on('exit', (code) => { + const err = errors.length > 0 && code !== 0 + if (err) { + console.log(errors.join('\n')) + } + resolve() + }) + }) +} + +// --------------------------------------------------------- unpackRetrievedFile +const unpackRetrievedFile = async (file) => { + try { + const jsonFilename = file.name.substring(0, file.name.length - 4) + await deleteTemporaryFile(jsonFilename) + await unpackZstFile(file) + // await deleteTemporaryFile(file.name) // TODO : don't delete zst file in tmp directory + const json = await GetFileFromDisk(`${CID_TMP_DIR}/${jsonFilename}`, true) + const fileMetadata = { + piece_cid: json.piece_cid, + payload_cid: json.payload_cid, + raw_car_file_size: json.raw_car_file_size, + dataset_slug: json.dataset, + filename: jsonFilename, + web3storageCreatedAt: file.created, + web3storageUpdatedAt: file.updated + } + await deleteTemporaryFile(jsonFilename) // TODO : do delete unpacked json file in tmp directory + return fileMetadata + } catch (e) { + console.log('============================ [Function: unpackRetrievedFiles]') + console.log(e) + } +} + +// -------------------------------------------------------------- cacheFailedCid +const cacheFailedCID = async (cid) => { + try { + await Pipeline(`${cid}\n`, Fs.createWriteStream(`${CID_TMP_DIR}/failed-cid-retrievals.txt`, { flags: 'a' })) + } catch (e) { + console.log('================================= [Function: cacheFailedCID ]') + console.log(e) + } +} + +// --------------------------------------------------------- deleteTemporaryFile +const deleteTemporaryFile = async (filename) => { + try { + if (Fs.existsSync(`${CID_TMP_DIR}/${filename}`)) { + Fs.unlinkSync(`${CID_TMP_DIR}/${filename}`) + } + } catch (e) { + console.log('============================ [Function: deleteTemporaryFile ]') + console.log(e) + } +} + +// ------------------------------------------------ writeBatchMetadataToDatabase +const writeBatchMetadataToDatabase = async (retrievedFiles) => { + try { + const operations = [] + const len = retrievedFiles.length + for (let i = 0; i < len; i++) { + const file = retrievedFiles[i] + operations.push({ + updateOne: { + filter: { payload_cid: file.payload_cid }, + update: { $set: file }, + upsert: true + } + }) + } + const response = await MC.model.Cidtemp.bulkWrite(operations, { ordered: false }) + return response.result + } catch (e) { + console.log('==================== [Function: writeBatchMetadataToDatabase]') + console.log(e) + } +} + +// ------------------------------------------------ backupFilesToBackblazeBucket +const backupFilesToBackblazeBucket = async (retrievedFiles) => { + try { + console.log('backup started') + } catch (e) { + console.log('==================== [Function: backupFilesToBackblazeBucket]') + console.log(e) + } +} + +// ////////////////////////////////////////////////////////////////////// Worker +// -------------------------------------------------------- processManifestBatch +const processManifestBatch = async (batch, batchNo) => { + try { + const len = batch.length + const uploaded = Math.floor(Math.random() * len) + const modified = len - uploaded + console.log(len, uploaded, modified) + // const retrievedFiles = [] + // for (let i = 0; i < len; i++) { + // const cidManifestItem = batch[i] + // const retrieved = await retrieveCidFile(cidManifestItem) + // if (retrieved) { retrievedFiles.push(retrieved) } + // } + // console.log(retrievedFiles) + // const databaseWriteResult = await writeBatchMetadataToDatabase(retrievedFiles) + // const batchBackupResult = await backupFilesToBackblazeBucket(retrievedFiles) + return new Promise((resolve, reject) => { + setTimeout(() => { + const result = new WorkerPool.Transfer({ + batch: batchNo, + databaseWriteResult: { + nUpserted: uploaded, + nModified: modified + }, + batchBackupResult: false + }) + resolve(result) + }, 3000) + }) + } catch (e) { + console.log('============================ [Function: processManifestBatch]') + console.log(e) + } +} + +// ////////////////////////////////////////////////////////////////// Initialize +// ----------------------------------------------------------------------------- +WorkerPool.worker({ + processManifestBatch: processManifestBatch +}) diff --git a/packages/be/crons/cid-importer.js b/packages/be/crons/cid-importer.js index a9f9e8c5..a0b61b16 100644 --- a/packages/be/crons/cid-importer.js +++ b/packages/be/crons/cid-importer.js @@ -17,14 +17,23 @@ const Util = require('util') const Stream = require('stream') const Pipeline = Util.promisify(Stream.pipeline) const readline = require('node:readline') -const Spawn = require('child_process').spawn const argv = require('minimist')(process.argv.slice(2)) +const WorkerPool = require('workerpool') + require('dotenv').config({ path: Path.resolve(__dirname, '../.env') }) const MC = require('../config') const CID_TMP_DIR = Path.resolve(`${MC.packageRoot}/tmp/cid-files`) +const numThreads = argv.threads || 16 +const Pool = WorkerPool.pool(Path.resolve(__dirname, './cid-batch-import.js'), { + maxWorkers: numThreads, + workerType: 'thread' +}) + +let startTime + // ////////////////////////////////////////////////////////////////// Initialize MC.app = Express() @@ -50,176 +59,72 @@ try { // ///////////////////////////////////////////////////////////////////// Modules require('@Module_Database') -require('@Module_Cid') -const { GetFileFromDisk, SecondsToHms } = require('@Module_Utilities') +require('@Module_Cidtemp') +const { SecondsToHms } = require('@Module_Utilities') // /////////////////////////////////////////////////////////////////// Functions -// ------------------------------------------------------------ retrieveCidFiles -const retrieveCidFile = async (line, retryNo = 0) => { +// --------------------------------------------------- performCidBatchOperations +const performCidBatchOperations = (manifest, batchNo, tasks, results) => { try { - if (retryNo > 0) { - console.log(`Retry number ${retryNo}`) - } - const upload = JSON.parse(line) - // fetch file using upload cid - const response = await Axios.get(`https://${upload.cid}.ipfs.w3s.link/`, { - responseType: 'stream' - }) - // if a file already exists with this name in the temp folder, - // delete it to make way for an updated version - await deleteTemporaryFile(upload.name) - // write file data to new zst file in the temp folder - await Pipeline(response.data, Fs.createWriteStream(`${CID_TMP_DIR}/${upload.name}`)) - // unpack the zst and return the inner json - return await unpackRetrievedFile({ - cid: upload.cid, - name: upload.name, - updated: upload.updated, - created: upload.created - }) - } catch (e) { - console.log('====================================== [Function: unpackCids]') - console.log(e) - if (retryNo < 10) { - console.log(`Error retrieving CID ${JSON.parse(line).cid}. Retrying retrieval...`) - await retrieveCidFile(line, retryNo + 1) + const batchSize = argv.pagesize || 1000 + const batch = manifest.slice(0, batchSize) + tasks.push( + Pool.exec('processManifestBatch', [batch, batchNo]).then((result) => { + results.push(result) + imported = result.databaseWriteResult.nUpserted + result.databaseWriteResult.nModified + console.log(`Batch ${result.batch} finished | ${imported} CIDs were imported to the database in this batch`) + }).catch((err) => { + console.log('================================ Error returned by worker') + console.error(err) + }) + ) + manifest.splice(0, batchSize) + if (manifest.length) { + performCidBatchOperations(manifest, ++batchNo, tasks, results) } else { - const cid = JSON.parse(line).cid - console.log(`Could not retrieve CID ${cid}. Max retries reached.`) - await cacheFailedCID(cid) - } - } -} - -// --------------------------------------------------------------- unpackZstFile -const unpackZstFile = (file) => { - return new Promise((resolve, reject) => { - const unzstd = Spawn('unzstd', [`../tmp/cid-files/${file.name}`]) - const errors = [] - unzstd.stderr.on('data', (msg) => { - errors.push(`Error unpacking ${file.name}: ${msg.toString()}`) - }) - unzstd.on('exit', (code) => { - const err = errors.length > 0 && code !== 0 - if (err) { - console.log(errors.join('\n')) - } - resolve() - }) - }) -} - -// --------------------------------------------------------- unpackRetrievedFile -const unpackRetrievedFile = async (file) => { - try { - const jsonFilename = file.name.substring(0, file.name.length - 4) - await deleteTemporaryFile(jsonFilename) - await unpackZstFile(file) - await deleteTemporaryFile(file.name) - const json = await GetFileFromDisk(`${CID_TMP_DIR}/${jsonFilename}`, true) - const fileMetadata = { - piece_cid: json.piece_cid, - payload_cid: json.payload_cid, - raw_car_file_size: json.raw_car_file_size, - dataset_slug: json.dataset, - filename: jsonFilename, - web3storageCreatedAt: file.created, - web3storageUpdatedAt: file.updated - } - await deleteTemporaryFile(jsonFilename) - return fileMetadata - } catch (e) { - console.log('============================ [Function: unpackRetrievedFiles]') - console.log(e) - } -} - -// ------------------------------------------------- writeFileMetadataToDatabase -const writeFileMetadataToDatabase = async (retrievedFiles) => { - try { - const operations = [] - const len = retrievedFiles.length - for (let i = 0; i < len; i++) { - const file = retrievedFiles[i] - operations.push({ - updateOne: { - filter: { payload_cid: file.payload_cid }, - update: { $set: file }, - upsert: true + Promise.all(tasks).catch((e) => { + console.log('=========================== Error returned by worker pool') + console.log(e) + }).then(() => { + Pool.terminate() + const len = results.length + console.log(results) + let dbTotalImported = 0 + let dbTotalModified = 0 + for (let i = 0; i < len; i++) { + const result = results[i] + dbTotalImported = dbTotalImported + result.databaseWriteResult.nUpserted + dbTotalModified = dbTotalModified + result.databaseWriteResult.nModified } + const endTime = process.hrtime()[0] + console.log(`📒 CID import finished | took ${SecondsToHms(endTime - startTime)}`) + console.log(`A total of ${dbTotalImported + dbTotalModified} CIDs were processed by the database | ${dbTotalImported} imported | ${dbTotalModified} modified`) + process.exit(0) }) } - const response = await MC.model.Cid.bulkWrite(operations, { ordered: false }) - return response.result - } catch (e) { - console.log('========================= [Function: writeCidFilesToDatabase]') - console.log(e) - } -} - -// -------------------------------------------------------------- cacheFailedCid -const cacheFailedCID = async (cid) => { - try { - await Pipeline(`${cid}\n`, Fs.createWriteStream(`${CID_TMP_DIR}/failed-cid-retrievals.txt`, { flags: 'a' })) - } catch (e) { - console.log('================================= [Function: cacheFailedCID ]') - console.log(e) - } -} - -// --------------------------------------------------------- deleteTemporaryFile -const deleteTemporaryFile = async (filename) => { - try { - if (Fs.existsSync(`${CID_TMP_DIR}/${filename}`)) { - Fs.unlinkSync(`${CID_TMP_DIR}/${filename}`) - } } catch (e) { - console.log('============================ [Function: deleteTemporaryFile ]') + console.log('======================= [Function: performCidBatchOperations]') console.log(e) } } // ------------------------------------------------- getCidFilesFromManifestList -const getCidFilesFromManifestList = async (importMax) => { +const getCidFilesFromManifestList = async () => { try { - if (Fs.existsSync(`${CID_TMP_DIR}/cid-manifest.txt`)) { - const manifest = Fs.createReadStream(`${CID_TMP_DIR}/cid-manifest.txt`) - const rl = readline.createInterface({ - input: manifest, - crlfDelay: Infinity - }) - // import all lines from the manifest to an array - const manifestCidLines = [] - for await (const line of rl) { - manifestCidLines.push(line) - } - // reverse the array to import oldest CIDs first - // begin writing to the database in batches - manifestCidLines.reverse() - let retrievedFiles = [] - let total = 0 - const batchSize = argv.pagesize || 1000 - const len = manifestCidLines.length - for (let i = 0; i < len; i++) { - if (i < importMax) { - const line = manifestCidLines[i] - console.log(`Retrieving file ${i + 1} from the CID manifest list.`) - const retrieved = await retrieveCidFile(line) - if (retrieved) { retrievedFiles.push(retrieved) } - // write retrieved file data to the database in batches of 1000 - if ((i + 1) % batchSize === 0) { - const result = await writeFileMetadataToDatabase(retrievedFiles) - total = total + result.nUpserted + result.nModified - console.log(`${result.nUpserted} new CIDs imported in this batch | ${result.nModified} CIDs updated in this batch | A total of ${total} CIDs imported/updated so far.`) - retrievedFiles = [] - } - } else { - break - } - } - const result = await writeFileMetadataToDatabase(retrievedFiles) - console.log(`${result.nUpserted} new CIDs imported in this batch | ${result.nModified} CIDs updated in this batch | A total of ${total + result.nUpserted + result.nModified} CIDs were imported/updated to the database.`) + const manifestList = Fs.createReadStream(`${CID_TMP_DIR}/cid-manifest.txt`) + const rl = readline.createInterface({ + input: manifestList, + crlfDelay: Infinity + }) + // import all lines from the manifest to an array + const manifest = [] + for await (const line of rl) { + manifest.push(line) } + // reverse the array to import oldest CIDs first + manifest.reverse() + // pass manifest on to the worker pool + performCidBatchOperations(manifest, 1, [], []) } catch (e) { console.log('===================== [Function: getCidFilesFromManifestList]') console.log(e) @@ -276,40 +181,40 @@ const createManifestFromWeb3StorageCids = async (searchParams, maxPages, lastSav // ///////////////////////////////////////////////////////////////// CidImporter const CidImporter = async () => { try { - const start = process.hrtime()[0] + startTime = process.hrtime()[0] const limit = argv.pagesize || 1000 const maxPages = argv.maxpages || Infinity - console.log(`📖 CID import started | page size of ${limit} and page maximum ${maxPages}.`) - // Get the latest upload entry from the database - const mostRecentCid = await MC.model.Cid.find().sort({ web3storageCreatedAt: -1 }).limit(1) - const mostRecentDocument = mostRecentCid[0] - console.log('Most recent CID imported:') - console.log(mostRecentDocument) - const lastSavedDate = mostRecentDocument ? new Date(mostRecentDocument.web3storageCreatedAt).getTime() : 0 - // Delete the outdated manifest file if it exists - await deleteTemporaryFile('cid-manifest.txt') - /** - * Build a manifest list of all cids not yet uploaded to the database: - * args: - * params passed to the initial api upload list request - * limit number of pages retrieved. Set to Infinity to retrieve all CIDs since the most recently uploaded saved in the db - * the most recent upload saved to our database (so as to request only more newer uploads) - */ - await createManifestFromWeb3StorageCids({ - size: limit, - page: 1, - sortBy: 'Date', - sortOrder: 'Desc' - }, maxPages, lastSavedDate) - /** - * Retrieve and unpack files one by one from the manifest list and bulkWrite contents to the database - * args: - * limit number of entries to the database (this will only be used for test) - */ - await getCidFilesFromManifestList(limit * maxPages) - const end = process.hrtime()[0] - console.log(`📒 CID import finished | took ${SecondsToHms(end - start)}`) - process.exit(0) + // console.log(`📖 CID import started | page size of ${limit} and page maximum ${maxPages}.`) + // // Get the latest upload entry from the database + // // const mostRecentCid = await MC.model.Cidtemp.find().sort({ web3storageCreatedAt: -1 }).limit(1) // TODO : uncomment line and replace with override option + // const mostRecentDocument = false // mostRecentCid[0] + // console.log('Most recent CID imported:') + // console.log(mostRecentDocument) + // const lastSavedDate = mostRecentDocument ? new Date(mostRecentDocument.web3storageCreatedAt).getTime() : 0 + // // Delete the outdated manifest file if it exists + // // await deleteTemporaryFile('cid-manifest.txt') + // if (Fs.existsSync(`${CID_TMP_DIR}/cid-manifest.txt`)) { + // Fs.unlinkSync(`${CID_TMP_DIR}/cid-manifest.txt`) + // } + // /** + // * Build a manifest list of all cids not yet uploaded to the database: + // * args: + // * params passed to the initial api upload list request + // * limit number of pages retrieved. Set to Infinity to retrieve all CIDs since the most recently uploaded saved in the db + // * the most recent upload saved to our database (so as to request only more newer uploads) + // */ + // await createManifestFromWeb3StorageCids({ + // size: limit, + // page: 1, + // sortBy: 'Date', + // sortOrder: 'Desc' + // }, maxPages, lastSavedDate) + // /** + // * Retrieve and unpack files one by one from the manifest list and bulkWrite contents to the database + // * args: + // * limit number of entries to the database (this will only be used for test) + // */ + await getCidFilesFromManifestList() } catch (e) { console.log('===================================== [Function: CidImporter]') console.log(e) diff --git a/packages/be/crons/x_cid-importer.js b/packages/be/crons/x_cid-importer.js new file mode 100644 index 00000000..9721378f --- /dev/null +++ b/packages/be/crons/x_cid-importer.js @@ -0,0 +1,322 @@ +/** + * + * ⏱️️ [Cron | weekly] CID Importer + * + * Caches CID data + * + */ + +// ///////////////////////////////////////////////////// Imports + general setup +// ----------------------------------------------------------------------------- +const ModuleAlias = require('module-alias') +const Path = require('path') +const Axios = require('axios') +const Fs = require('fs-extra') +const Express = require('express') +const Util = require('util') +const Stream = require('stream') +const Pipeline = Util.promisify(Stream.pipeline) +const readline = require('node:readline') +const Spawn = require('child_process').spawn +const argv = require('minimist')(process.argv.slice(2)) +require('dotenv').config({ path: Path.resolve(__dirname, '../.env') }) + +const MC = require('../config') + +const CID_TMP_DIR = Path.resolve(`${MC.packageRoot}/tmp/cid-files`) + +// ////////////////////////////////////////////////////////////////// Initialize +MC.app = Express() + +// ///////////////////////////////////////////////////////////////////// Aliases +ModuleAlias.addAliases({ + '@Root': MC.packageRoot, + '@Modules': `${MC.packageRoot}/modules` +}) + +try { + const modulesRoot = `${MC.packageRoot}/modules` + const items = Fs.readdirSync(modulesRoot) + items.forEach((name) => { + const path = `${modulesRoot}/${name}` + if (Fs.statSync(path).isDirectory()) { + const moduleName = `${name[0].toUpperCase() + name.substring(1)}` + ModuleAlias.addAlias(`@Module_${moduleName}`, path) + } + }) +} catch (e) { + console.log(e) +} + +// ///////////////////////////////////////////////////////////////////// Modules +require('@Module_Database') +require('@Module_Cidtemp') +const { GetFileFromDisk, SecondsToHms } = require('@Module_Utilities') + +// /////////////////////////////////////////////////////////////////// Functions +// ------------------------------------------------------------ retrieveCidFiles +const retrieveCidFile = async (line, retryNo = 0) => { + try { + if (retryNo > 0) { + console.log(`Retry number ${retryNo}`) + } + const upload = JSON.parse(line) + // fetch file using upload cid + const response = await Axios.get(`https://${upload.cid}.ipfs.w3s.link/`, { + responseType: 'stream' + }) + // if a file already exists with this name in the temp folder, + // delete it to make way for an updated version + await deleteTemporaryFile(upload.name) + // write file data to new zst file in the temp folder + await Pipeline(response.data, Fs.createWriteStream(`${CID_TMP_DIR}/${upload.name}`)) + // unpack the zst and return the inner json + return await unpackRetrievedFile({ + cid: upload.cid, + name: upload.name, + updated: upload.updated, + created: upload.created + }) + } catch (e) { + console.log('====================================== [Function: unpackCids]') + console.log(e) + if (retryNo < 10) { + console.log(`Error retrieving CID ${JSON.parse(line).cid}. Retrying retrieval...`) + await retrieveCidFile(line, retryNo + 1) + } else { + const cid = JSON.parse(line).cid + console.log(`Could not retrieve CID ${cid}. Max retries reached.`) + await cacheFailedCID(cid) + } + } +} + +// --------------------------------------------------------------- unpackZstFile +const unpackZstFile = (file) => { + return new Promise((resolve, reject) => { + const unzstd = Spawn('unzstd', [`../tmp/cid-files/${file.name}`]) + const errors = [] + unzstd.stderr.on('data', (msg) => { + errors.push(`Error unpacking ${file.name}: ${msg.toString()}`) + }) + unzstd.on('exit', (code) => { + const err = errors.length > 0 && code !== 0 + if (err) { + console.log(errors.join('\n')) + } + resolve() + }) + }) +} + +// --------------------------------------------------------- unpackRetrievedFile +const unpackRetrievedFile = async (file) => { + try { + const jsonFilename = file.name.substring(0, file.name.length - 4) + await deleteTemporaryFile(jsonFilename) + await unpackZstFile(file) + await deleteTemporaryFile(file.name) + const json = await GetFileFromDisk(`${CID_TMP_DIR}/${jsonFilename}`, true) + const fileMetadata = { + piece_cid: json.piece_cid, + payload_cid: json.payload_cid, + raw_car_file_size: json.raw_car_file_size, + dataset_slug: json.dataset, + filename: jsonFilename, + web3storageCreatedAt: file.created, + web3storageUpdatedAt: file.updated + } + await deleteTemporaryFile(jsonFilename) + return fileMetadata + } catch (e) { + console.log('============================ [Function: unpackRetrievedFiles]') + console.log(e) + } +} + +// ------------------------------------------------- writeFileMetadataToDatabase +const writeFileMetadataToDatabase = async (retrievedFiles) => { + try { + const operations = [] + const len = retrievedFiles.length + for (let i = 0; i < len; i++) { + const file = retrievedFiles[i] + operations.push({ + updateOne: { + filter: { payload_cid: file.payload_cid }, + update: { $set: file }, + upsert: true + } + }) + } + const response = await MC.model.Cidtemp.bulkWrite(operations, { ordered: false }) + return response.result + } catch (e) { + console.log('========================= [Function: writeCidFilesToDatabase]') + console.log(e) + } +} + +// -------------------------------------------------------------- cacheFailedCid +const cacheFailedCID = async (cid) => { + try { + await Pipeline(`${cid}\n`, Fs.createWriteStream(`${CID_TMP_DIR}/failed-cid-retrievals.txt`, { flags: 'a' })) + } catch (e) { + console.log('================================= [Function: cacheFailedCID ]') + console.log(e) + } +} + +// --------------------------------------------------------- deleteTemporaryFile +const deleteTemporaryFile = async (filename) => { + try { + if (Fs.existsSync(`${CID_TMP_DIR}/${filename}`)) { + Fs.unlinkSync(`${CID_TMP_DIR}/${filename}`) + } + } catch (e) { + console.log('============================ [Function: deleteTemporaryFile ]') + console.log(e) + } +} + +// ------------------------------------------------- getCidFilesFromManifestList +const getCidFilesFromManifestList = async (importMax) => { + try { + if (Fs.existsSync(`${CID_TMP_DIR}/cid-manifest.txt`)) { + const manifest = Fs.createReadStream(`${CID_TMP_DIR}/cid-manifest.txt`) + const rl = readline.createInterface({ + input: manifest, + crlfDelay: Infinity + }) + // import all lines from the manifest to an array + const manifestCidLines = [] + for await (const line of rl) { + manifestCidLines.push(line) + } + // reverse the array to import oldest CIDs first + // begin writing to the database in batches + manifestCidLines.reverse() + let retrievedFiles = [] + let total = 0 + const batchSize = argv.pagesize || 1000 + const len = manifestCidLines.length + for (let i = 0; i < len; i++) { + if (i < importMax) { + const line = manifestCidLines[i] + console.log(`Retrieving file ${i + 1} from the CID manifest list.`) + const retrieved = await retrieveCidFile(line) + if (retrieved) { retrievedFiles.push(retrieved) } + // write retrieved file data to the database in batches of 1000 + if ((i + 1) % batchSize === 0) { + const result = await writeFileMetadataToDatabase(retrievedFiles) + total = total + result.nUpserted + result.nModified + console.log(`${result.nUpserted} new CIDs imported in this batch | ${result.nModified} CIDs updated in this batch | A total of ${total} CIDs imported/updated so far.`) + retrievedFiles = [] + } + } else { + break + } + } + const result = await writeFileMetadataToDatabase(retrievedFiles) + console.log(`${result.nUpserted} new CIDs imported in this batch | ${result.nModified} CIDs updated in this batch | A total of ${total + result.nUpserted + result.nModified} CIDs were imported/updated to the database.`) + } + } catch (e) { + console.log('===================== [Function: getCidFilesFromManifestList]') + console.log(e) + } +} + +// ------------------------------------------- createManifestFromWeb3StorageCids +const createManifestFromWeb3StorageCids = async (searchParams, maxPages, lastSavedDate, count) => { + try { + const options = { headers: { Accept: 'application/json', Authorization: `Bearer ${process.env.WEB3STORAGE_TOKEN}` } } + const params = Object.keys(searchParams).map((item) => `${item}=${searchParams[item]}`).join('&') + const url = `https://api.web3.storage/user/uploads?${params}` + const response = await Axios.get(url, options) + const uploads = response.data + const len = uploads.length + let skipCount = 0 + let total = 0 + let lastSavedDateReached = false + for (let i = 0; i < len; i++) { + const upload = uploads[i] + const uploadDate = new Date(upload.created).getTime() + if (uploadDate > lastSavedDate) { + if (upload.name.endsWith('.zst')) { + const newline = JSON.stringify({ cid: upload.cid, name: upload.name, created: upload.created, updated: upload.updated }) + await Pipeline(`${newline}\n`, Fs.createWriteStream(`${CID_TMP_DIR}/cid-manifest.txt`, { flags: 'a' })) + } else { + skipCount++ + } + } else { + lastSavedDateReached = true + break + } + total = i + 1 + } + total = total - skipCount + const lineWriteCount = count ? count + total : total + console.log(`${total} new CID(s) saved to the manifest in this batch | ${skipCount} file(s) were skipped due to filetype mismatch | total of ${lineWriteCount} new CID(s) saved so far`) + if (uploads.length === searchParams.size && !lastSavedDateReached && searchParams.page < maxPages) { + searchParams.page += 1 + await createManifestFromWeb3StorageCids(searchParams, maxPages, lastSavedDate, lineWriteCount) + } else { + console.log(`Finished writing CID manifest file - a total of ${lineWriteCount} new CID(s) will be imported to the database.`) + } + } catch (e) { + console.log('=============== [Function: createManifestFromWeb3StorageCids]') + console.log(e) + if (e.response && e.response.status === 500) { + await createManifestFromWeb3StorageCids(searchParams, maxPages, lastSavedDate, count) + console.log(`Retrying fetching uploads on page ${searchParams.page}`) + } + } +} + +// ///////////////////////////////////////////////////////////////// CidImporter +const CidImporter = async () => { + try { + const start = process.hrtime()[0] + const limit = argv.pagesize || 1000 + const maxPages = argv.maxpages || Infinity + console.log(`📖 CID import started | page size of ${limit} and page maximum ${maxPages}.`) + // Get the latest upload entry from the database + const mostRecentCid = await MC.model.Cidtemp.find().sort({ web3storageCreatedAt: -1 }).limit(1) + const mostRecentDocument = mostRecentCid[0] + console.log('Most recent CID imported:') + console.log(mostRecentDocument) + const lastSavedDate = mostRecentDocument ? new Date(mostRecentDocument.web3storageCreatedAt).getTime() : 0 + // Delete the outdated manifest file if it exists + await deleteTemporaryFile('cid-manifest.txt') + /** + * Build a manifest list of all cids not yet uploaded to the database: + * args: + * params passed to the initial api upload list request + * limit number of pages retrieved. Set to Infinity to retrieve all CIDs since the most recently uploaded saved in the db + * the most recent upload saved to our database (so as to request only more newer uploads) + */ + await createManifestFromWeb3StorageCids({ + size: limit, + page: 1, + sortBy: 'Date', + sortOrder: 'Desc' + }, maxPages, lastSavedDate) + /** + * Retrieve and unpack files one by one from the manifest list and bulkWrite contents to the database + * args: + * limit number of entries to the database (this will only be used for test) + */ + await getCidFilesFromManifestList(limit * maxPages) + const end = process.hrtime()[0] + console.log(`📒 CID import finished | took ${SecondsToHms(end - start)}`) + process.exit(0) + } catch (e) { + console.log('===================================== [Function: CidImporter]') + console.log(e) + process.exit(0) + } +} + +// ////////////////////////////////////////////////////////////////// Initialize +// ----------------------------------------------------------------------------- +MC.app.on('mongoose-connected', CidImporter) From 49c1bddd8f8601402cb37e1556563bb13ef530e6 Mon Sep 17 00:00:00 2001 From: svvimming Date: Wed, 3 May 2023 12:01:05 -0400 Subject: [PATCH 2/8] feat: move workerpool dependency to be package --- package-lock.json | 16 +++++++- packages/be/modules/cidtemp/index.js | 16 ++++++++ packages/be/modules/cidtemp/model/index.js | 46 ++++++++++++++++++++++ packages/be/package.json | 3 +- 4 files changed, 78 insertions(+), 3 deletions(-) create mode 100644 packages/be/modules/cidtemp/index.js create mode 100644 packages/be/modules/cidtemp/model/index.js diff --git a/package-lock.json b/package-lock.json index 61195e98..3e4642fa 100644 --- a/package-lock.json +++ b/package-lock.json @@ -29253,6 +29253,11 @@ "microevent.ts": "~0.1.1" } }, + "node_modules/workerpool": { + "version": "6.4.0", + "resolved": "https://registry.npmjs.org/workerpool/-/workerpool-6.4.0.tgz", + "integrity": "sha512-i3KR1mQMNwY2wx20ozq2EjISGtQWDIfV56We+yGJ5yDs8jTwQiLLaqHlkBHITlCuJnYlVRmXegxFxZg7gqI++A==" + }, "node_modules/wrap-ansi": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-7.0.0.tgz", @@ -29608,7 +29613,8 @@ "socket.io": "^4.5.2", "socket.io-client": "^4.5.2", "uuid": "^8.3.1", - "uuid-apikey": "^1.5.1" + "uuid-apikey": "^1.5.1", + "workerpool": "^6.4.0" }, "devDependencies": { "eslint": "^8.23.0", @@ -37466,7 +37472,8 @@ "socket.io": "^4.5.2", "socket.io-client": "^4.5.2", "uuid": "^8.3.1", - "uuid-apikey": "^1.5.1" + "uuid-apikey": "^1.5.1", + "workerpool": "*" } }, "better-opn": { @@ -52182,6 +52189,11 @@ "microevent.ts": "~0.1.1" } }, + "workerpool": { + "version": "6.4.0", + "resolved": "https://registry.npmjs.org/workerpool/-/workerpool-6.4.0.tgz", + "integrity": "sha512-i3KR1mQMNwY2wx20ozq2EjISGtQWDIfV56We+yGJ5yDs8jTwQiLLaqHlkBHITlCuJnYlVRmXegxFxZg7gqI++A==" + }, "wrap-ansi": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-7.0.0.tgz", diff --git a/packages/be/modules/cidtemp/index.js b/packages/be/modules/cidtemp/index.js new file mode 100644 index 00000000..cb2ea84e --- /dev/null +++ b/packages/be/modules/cidtemp/index.js @@ -0,0 +1,16 @@ +console.log('📦 [module] cid temp') + +// ///////////////////////////////////////////////////////////////////// Imports +// ----------------------------------------------------------------------------- +const { RunStartupChecks } = require('@Module_Utilities') + +const MC = require('@Root/config') + +// ////////////////////////////////////////////////////////////// Startup Checks +// ----------------------------------------------------------------------------- +const checks = [] +RunStartupChecks(checks) + +// //////////////////////////////////////////////////////////////// Import Model +// ----------------------------------------------------------------------------- +MC.model.Cidtemp = require('@Module_Cidtemp/model') diff --git a/packages/be/modules/cidtemp/model/index.js b/packages/be/modules/cidtemp/model/index.js new file mode 100644 index 00000000..745e13b5 --- /dev/null +++ b/packages/be/modules/cidtemp/model/index.js @@ -0,0 +1,46 @@ +console.log('💿 [model] op_cids_temp') + +// ///////////////////////////////////////////////////////////////////// Imports +// ----------------------------------------------------------------------------- +const Mongoose = require('mongoose') +const Schema = Mongoose.Schema + +// ////////////////////////////////////////////////////////////////////// Schema +// ----------------------------------------------------------------------------- +const CIDtempSchema = new Schema({ + piece_cid: { + type: String, + required: true + }, + payload_cid: { + type: String, + required: true, + index: true + }, + raw_car_file_size: { + type: Number, + required: true + }, + dataset_slug: { + type: String, + required: true + }, + web3storageCreatedAt: { + type: Date, + required: true + }, + web3storageUpdatedAt: { + type: Date, + required: true + }, + filename: { + type: String, + required: true + } +}, { + timestamps: false +}) + +// ////////////////////////////////////////////////////////////////////// Export +// ----------------------------------------------------------------------------- +module.exports = Mongoose.model('op_cids_temp', CIDtempSchema) diff --git a/packages/be/package.json b/packages/be/package.json index 6f4c4edf..698305dc 100644 --- a/packages/be/package.json +++ b/packages/be/package.json @@ -52,6 +52,7 @@ "socket.io": "^4.5.2", "socket.io-client": "^4.5.2", "uuid": "^8.3.1", - "uuid-apikey": "^1.5.1" + "uuid-apikey": "^1.5.1", + "workerpool": "^6.4.0" } } From 1359d3a93aea01a7bbf22bcd497eb3d373e857e1 Mon Sep 17 00:00:00 2001 From: svvimming Date: Fri, 5 May 2023 15:13:12 -0400 Subject: [PATCH 3/8] feat: console log statements and edge case handling --- packages/be/crons/cid-batch-import.js | 128 +++++++++++------ packages/be/crons/cid-importer.js | 133 +++++++++++------- .../open-panda-dataset-meta-bk__filter.txt | 5 + 3 files changed, 170 insertions(+), 96 deletions(-) create mode 100644 packages/be/crons/open-panda-dataset-meta-bk__filter.txt diff --git a/packages/be/crons/cid-batch-import.js b/packages/be/crons/cid-batch-import.js index fa4fbebf..295ebba7 100644 --- a/packages/be/crons/cid-batch-import.js +++ b/packages/be/crons/cid-batch-import.js @@ -46,10 +46,11 @@ try { // ///////////////////////////////////////////////////////////////////// Modules require('@Module_Database') require('@Module_Cidtemp') +const { GetFileFromDisk } = require('@Module_Utilities') // /////////////////////////////////////////////////////////////////// Functions // ------------------------------------------------------------ retrieveCidFiles -const retrieveCidFile = async (line, retryNo = 0) => { +const retrieveCidFile = async (line, batchNo, retryNo = 0) => { try { if (retryNo > 0) { console.log(`Retry number ${retryNo}`) @@ -61,34 +62,34 @@ const retrieveCidFile = async (line, retryNo = 0) => { }) // if a file already exists with this name in the temp folder, // delete it to make way for an updated version - await deleteTemporaryFile(CID_TMP_DIR, upload.name) + await deleteTemporaryFile(`batch_${batchNo}/${upload.name}`) // write file data to new zst file in the temp folder - await Pipeline(response.data, Fs.createWriteStream(`${CID_TMP_DIR}/${upload.name}`)) + await Pipeline(response.data, Fs.createWriteStream(`${CID_TMP_DIR}/batch_${batchNo}/${upload.name}`)) // unpack the zst and return the inner json - return await unpackRetrievedFile(CID_TMP_DIR, { + return await unpackRetrievedFile({ cid: upload.cid, name: upload.name, updated: upload.updated, created: upload.created - }) + }, batchNo) } catch (e) { - console.log('====================================== [Function: unpackCids]') - console.log(e) if (retryNo < 10) { console.log(`Error retrieving CID ${JSON.parse(line).cid}. Retrying retrieval...`) - await retrieveCidFile(CID_TMP_DIR, line, retryNo + 1) + await retrieveCidFile(line, batchNo, retryNo + 1) } else { const cid = JSON.parse(line).cid + await cacheFailedCID(cid) + console.log('==================================== [Function: unpackCids]') console.log(`Could not retrieve CID ${cid}. Max retries reached.`) - await cacheFailedCID(CID_TMP_DIR, cid) + console.log(e) } } } // --------------------------------------------------------------- unpackZstFile -const unpackZstFile = (file) => { +const unpackZstFile = (file, batchNo) => { return new Promise((resolve, reject) => { - const unzstd = Spawn('unzstd', [`../tmp/cid-files/${file.name}`]) + const unzstd = Spawn('unzstd', [`../tmp/cid-files/batch_${batchNo}/${file.name}`]) const errors = [] unzstd.stderr.on('data', (msg) => { errors.push(`Error unpacking ${file.name}: ${msg.toString()}`) @@ -104,13 +105,11 @@ const unpackZstFile = (file) => { } // --------------------------------------------------------- unpackRetrievedFile -const unpackRetrievedFile = async (file) => { +const unpackRetrievedFile = async (file, batchNo) => { try { const jsonFilename = file.name.substring(0, file.name.length - 4) - await deleteTemporaryFile(jsonFilename) - await unpackZstFile(file) - // await deleteTemporaryFile(file.name) // TODO : don't delete zst file in tmp directory - const json = await GetFileFromDisk(`${CID_TMP_DIR}/${jsonFilename}`, true) + await unpackZstFile(file, batchNo) + const json = await GetFileFromDisk(`${CID_TMP_DIR}/batch_${batchNo}/${jsonFilename}`, true) const fileMetadata = { piece_cid: json.piece_cid, payload_cid: json.payload_cid, @@ -120,7 +119,7 @@ const unpackRetrievedFile = async (file) => { web3storageCreatedAt: file.created, web3storageUpdatedAt: file.updated } - await deleteTemporaryFile(jsonFilename) // TODO : do delete unpacked json file in tmp directory + await deleteTemporaryFile(`batch_${batchNo}/${jsonFilename}`) return fileMetadata } catch (e) { console.log('============================ [Function: unpackRetrievedFiles]') @@ -139,10 +138,10 @@ const cacheFailedCID = async (cid) => { } // --------------------------------------------------------- deleteTemporaryFile -const deleteTemporaryFile = async (filename) => { +const deleteTemporaryFile = async (path) => { try { - if (Fs.existsSync(`${CID_TMP_DIR}/${filename}`)) { - Fs.unlinkSync(`${CID_TMP_DIR}/${filename}`) + if (Fs.existsSync(`${CID_TMP_DIR}/${path}`)) { + Fs.unlinkSync(`${CID_TMP_DIR}/${path}`) } } catch (e) { console.log('============================ [Function: deleteTemporaryFile ]') @@ -173,12 +172,35 @@ const writeBatchMetadataToDatabase = async (retrievedFiles) => { } } -// ------------------------------------------------ backupFilesToBackblazeBucket -const backupFilesToBackblazeBucket = async (retrievedFiles) => { +// ------------------------------------------------- backupCidsToBackblazeBucket +const backupCidsToBackblazeBucket = async (batchNo) => { try { - console.log('backup started') + const rclone = Spawn('rclone', [ + 'copy', + `${MC.tmpRoot}/cid-files/batch_${batchNo}`, + process.env.B2_OPENPANDA_BUCKET, + '--filter-from', + `${MC.packageRoot}/crons/open-panda-dataset-meta-bk__filter.txt` + // process.env.B2_OPENPANDA_FILTER + ]) + const errors = [] + for await (const msg of rclone.stderr) { + errors.push(msg.toString()) + } + return await new Promise((resolve, reject) => { + rclone.on('exit', (code) => { + const err = errors.length > 0 && code !== 0 + err ? reject({ + success: false, + message: errors.join('\n\n') + }) : resolve({ + success: true, + message: `✓ CID batch ${batchNo} backup successful` + }) + }) + }) } catch (e) { - console.log('==================== [Function: backupFilesToBackblazeBucket]') + console.log('===================== [Function: backupCidsToBackblazeBucket]') console.log(e) } } @@ -187,31 +209,45 @@ const backupFilesToBackblazeBucket = async (retrievedFiles) => { // -------------------------------------------------------- processManifestBatch const processManifestBatch = async (batch, batchNo) => { try { + // make a temporary subdirectory for this batch + if (!Fs.existsSync(`${CID_TMP_DIR}/batch_${batchNo}`)) { + Fs.mkdirSync(`${CID_TMP_DIR}/batch_${batchNo}`) + } + // individually download each CID file in the batch + // save zst to a temp/cid-files/batch_x folder, extract and return metadata + // to the retrieved array const len = batch.length - const uploaded = Math.floor(Math.random() * len) - const modified = len - uploaded - console.log(len, uploaded, modified) - // const retrievedFiles = [] - // for (let i = 0; i < len; i++) { - // const cidManifestItem = batch[i] - // const retrieved = await retrieveCidFile(cidManifestItem) - // if (retrieved) { retrievedFiles.push(retrieved) } - // } - // console.log(retrievedFiles) - // const databaseWriteResult = await writeBatchMetadataToDatabase(retrievedFiles) - // const batchBackupResult = await backupFilesToBackblazeBucket(retrievedFiles) - return new Promise((resolve, reject) => { - setTimeout(() => { - const result = new WorkerPool.Transfer({ - batch: batchNo, - databaseWriteResult: { - nUpserted: uploaded, - nModified: modified - }, - batchBackupResult: false + const retrievedFiles = [] + for (let i = 0; i < len; i++) { + const cidManifestItem = batch[i] + const retrieved = await retrieveCidFile(cidManifestItem, batchNo) + if (retrieved) { retrievedFiles.push(retrieved) } + } + if (!retrievedFiles.length) { + throw new Error('No CIDs could be retrieved from this batch') + } + // save batch metadata to the database + const databaseWriteResult = await writeBatchMetadataToDatabase(retrievedFiles) + // backup zst files in the corresponding temp/cid-files/batch_x folder to backblaze + const batchBackupResult = await backupCidsToBackblazeBucket(batchNo) + // if the backup is successful clean up temp folder by deleting batch + if (batchBackupResult && batchBackupResult.success) { + if (Fs.existsSync(`${CID_TMP_DIR}/batch_${batchNo}`)) { + Fs.rm(`${CID_TMP_DIR}/batch_${batchNo}`, { recursive: true, force: true }) + } + } + // return results to the main thread + return await new Promise((resolve, reject) => { + if (!databaseWriteResult || !batchBackupResult) { + reject() + } else { + const result = new WorkerPool.Transfer({ + batchNo: batchNo, + databaseWriteResult: databaseWriteResult, + batchBackupResult: batchBackupResult }) resolve(result) - }, 3000) + } }) } catch (e) { console.log('============================ [Function: processManifestBatch]') diff --git a/packages/be/crons/cid-importer.js b/packages/be/crons/cid-importer.js index a0b61b16..ecdf692a 100644 --- a/packages/be/crons/cid-importer.js +++ b/packages/be/crons/cid-importer.js @@ -31,8 +31,8 @@ const Pool = WorkerPool.pool(Path.resolve(__dirname, './cid-batch-import.js'), { maxWorkers: numThreads, workerType: 'thread' }) - let startTime +let manifestLength = 0 // ////////////////////////////////////////////////////////////////// Initialize MC.app = Express() @@ -63,42 +63,70 @@ require('@Module_Cidtemp') const { SecondsToHms } = require('@Module_Utilities') // /////////////////////////////////////////////////////////////////// Functions +// ------------------------------------------------------- getCurrentImportTotal +const getCurrentImportTotal = (results) => { + const len = results.length + let dbTotalImported = 0 + let dbTotalModified = 0 + for (let i = 0; i < len; i++) { + const result = results[i] + if (result && result.databaseWriteResult) { + const newlyImported = result.databaseWriteResult.nUpserted || 0 + const newlyModified = result.databaseWriteResult.nModified || 0 + dbTotalImported = dbTotalImported + newlyImported + dbTotalModified = dbTotalModified + newlyModified + } + } + return { + imported: dbTotalImported, + modified: dbTotalModified, + total: dbTotalImported + dbTotalModified + } +} + // --------------------------------------------------- performCidBatchOperations -const performCidBatchOperations = (manifest, batchNo, tasks, results) => { +const performCidBatchOperations = (manifest, batchNo, tasks, results, retryQueue, inRetryLoop) => { try { + if (inRetryLoop) { + console.log('The following batch previously failed and is being retried:') + } const batchSize = argv.pagesize || 1000 const batch = manifest.slice(0, batchSize) tasks.push( - Pool.exec('processManifestBatch', [batch, batchNo]).then((result) => { + Pool.exec('processManifestBatch', [batch, batchNo]).then(async (result) => { + if (!result || !result.databaseWriteResult || !result.batchBackupResult) { + throw new Error(`An issue occured with batch ${batchNo}`) + } results.push(result) - imported = result.databaseWriteResult.nUpserted + result.databaseWriteResult.nModified - console.log(`Batch ${result.batch} finished | ${imported} CIDs were imported to the database in this batch`) - }).catch((err) => { + const imported = `${result.databaseWriteResult.nUpserted + result.databaseWriteResult.nModified} CIDs were imported to the db in this batch` + const activeTasks = `${Pool.stats().activeTasks} batches currently being processed` + const pendingTasks = `${Pool.stats().pendingTasks} pending batches remaining` + const currentTotals = await getCurrentImportTotal(results) + console.log(`Batch ${result.batchNo} finished | ${imported} | ${result.batchBackupResult.message} | ${activeTasks} | ${pendingTasks} | ${currentTotals.total} of ${manifestLength} CIDs completed`) + }).catch((e) => { + retryQueue.push(batch) console.log('================================ Error returned by worker') - console.error(err) + console.error(e) }) ) manifest.splice(0, batchSize) if (manifest.length) { - performCidBatchOperations(manifest, ++batchNo, tasks, results) + performCidBatchOperations(manifest, ++batchNo, tasks, results, retryQueue, inRetryLoop) + } else if (retryQueue.length) { + performCidBatchOperations(retryQueue, 1, tasks, results, [], true) } else { Promise.all(tasks).catch((e) => { console.log('=========================== Error returned by worker pool') console.log(e) - }).then(() => { - Pool.terminate() + process.exit(0) + }).then(async () => { const len = results.length - console.log(results) - let dbTotalImported = 0 - let dbTotalModified = 0 - for (let i = 0; i < len; i++) { - const result = results[i] - dbTotalImported = dbTotalImported + result.databaseWriteResult.nUpserted - dbTotalModified = dbTotalModified + result.databaseWriteResult.nModified - } const endTime = process.hrtime()[0] + const finalResults = await getCurrentImportTotal(results) console.log(`📒 CID import finished | took ${SecondsToHms(endTime - startTime)}`) - console.log(`A total of ${dbTotalImported + dbTotalModified} CIDs were processed by the database | ${dbTotalImported} imported | ${dbTotalModified} modified`) + console.log(`A total of ${finalResults.total} CIDs were processed by the database | ${finalResults.imported} imported | ${finalResults.modified} modified` | `${finalResults.total} total CIDs successfully backed up`) + console.log(`${manifestLength - finalResults.total} CIDs either already existed in the database or could not be retrieved and have been cached for the next import.`) + Pool.terminate() process.exit(0) }) } @@ -123,8 +151,9 @@ const getCidFilesFromManifestList = async () => { } // reverse the array to import oldest CIDs first manifest.reverse() + manifestLength = manifest.length // pass manifest on to the worker pool - performCidBatchOperations(manifest, 1, [], []) + performCidBatchOperations(manifest, 1, [], [], [], false) } catch (e) { console.log('===================== [Function: getCidFilesFromManifestList]') console.log(e) @@ -184,36 +213,40 @@ const CidImporter = async () => { startTime = process.hrtime()[0] const limit = argv.pagesize || 1000 const maxPages = argv.maxpages || Infinity - // console.log(`📖 CID import started | page size of ${limit} and page maximum ${maxPages}.`) - // // Get the latest upload entry from the database - // // const mostRecentCid = await MC.model.Cidtemp.find().sort({ web3storageCreatedAt: -1 }).limit(1) // TODO : uncomment line and replace with override option - // const mostRecentDocument = false // mostRecentCid[0] - // console.log('Most recent CID imported:') - // console.log(mostRecentDocument) - // const lastSavedDate = mostRecentDocument ? new Date(mostRecentDocument.web3storageCreatedAt).getTime() : 0 - // // Delete the outdated manifest file if it exists - // // await deleteTemporaryFile('cid-manifest.txt') - // if (Fs.existsSync(`${CID_TMP_DIR}/cid-manifest.txt`)) { - // Fs.unlinkSync(`${CID_TMP_DIR}/cid-manifest.txt`) - // } - // /** - // * Build a manifest list of all cids not yet uploaded to the database: - // * args: - // * params passed to the initial api upload list request - // * limit number of pages retrieved. Set to Infinity to retrieve all CIDs since the most recently uploaded saved in the db - // * the most recent upload saved to our database (so as to request only more newer uploads) - // */ - // await createManifestFromWeb3StorageCids({ - // size: limit, - // page: 1, - // sortBy: 'Date', - // sortOrder: 'Desc' - // }, maxPages, lastSavedDate) - // /** - // * Retrieve and unpack files one by one from the manifest list and bulkWrite contents to the database - // * args: - // * limit number of entries to the database (this will only be used for test) - // */ + console.log(`📖 CID import started | page size of ${limit} and page maximum ${maxPages}.`) + // Get the latest upload entry from the database + const mostRecentCid = await MC.model.Cidtemp.find().sort({ web3storageCreatedAt: -1 }).limit(1) + let mostRecentDocument = mostRecentCid[0] + if (argv.all) { + mostRecentDocument = false + } else { + console.log('Most recent CID imported:') + console.log(mostRecentDocument) + } + const lastSavedDate = mostRecentDocument ? new Date(mostRecentDocument.web3storageCreatedAt).getTime() : 0 + // Delete the outdated manifest file if it exists + // await deleteTemporaryFile('cid-manifest.txt') + if (Fs.existsSync(`${CID_TMP_DIR}/cid-manifest.txt`)) { + Fs.unlinkSync(`${CID_TMP_DIR}/cid-manifest.txt`) + } + /** + * Build a manifest list of all cids not yet uploaded to the database: + * args: + * params passed to the initial api upload list request + * limit number of pages retrieved. Set to Infinity to retrieve all CIDs since the most recently uploaded saved in the db + * the most recent upload saved to our database (so as to request only more newer uploads) + */ + await createManifestFromWeb3StorageCids({ + size: limit, + page: 1, + sortBy: 'Date', + sortOrder: 'Desc' + }, maxPages, lastSavedDate) + /** + * Retrieve and unpack files one by one from the manifest list and bulkWrite contents to the database + * args: + * limit number of entries to the database (this will only be used for test) + */ await getCidFilesFromManifestList() } catch (e) { console.log('===================================== [Function: CidImporter]') diff --git a/packages/be/crons/open-panda-dataset-meta-bk__filter.txt b/packages/be/crons/open-panda-dataset-meta-bk__filter.txt new file mode 100644 index 00000000..1e854a9d --- /dev/null +++ b/packages/be/crons/open-panda-dataset-meta-bk__filter.txt @@ -0,0 +1,5 @@ +# Filtering rules (Arrange the order of filter rules with the most restrictive first and work down.) ++ *.zst + +# Exclude everything else +- ** From b3e4b8c515d6f4002a2915e7716b9b08b9503247 Mon Sep 17 00:00:00 2001 From: svvimming Date: Mon, 29 May 2023 20:40:30 -0400 Subject: [PATCH 4/8] test: move batch process function to separate script --- packages/be/crons/cid-importer.js | 138 +++++++++--------- .../be/crons/worker-pool-batch-processor.js | 95 ++++++++++++ 2 files changed, 164 insertions(+), 69 deletions(-) create mode 100644 packages/be/crons/worker-pool-batch-processor.js diff --git a/packages/be/crons/cid-importer.js b/packages/be/crons/cid-importer.js index ecdf692a..1d39ce76 100644 --- a/packages/be/crons/cid-importer.js +++ b/packages/be/crons/cid-importer.js @@ -64,77 +64,77 @@ const { SecondsToHms } = require('@Module_Utilities') // /////////////////////////////////////////////////////////////////// Functions // ------------------------------------------------------- getCurrentImportTotal -const getCurrentImportTotal = (results) => { - const len = results.length - let dbTotalImported = 0 - let dbTotalModified = 0 - for (let i = 0; i < len; i++) { - const result = results[i] - if (result && result.databaseWriteResult) { - const newlyImported = result.databaseWriteResult.nUpserted || 0 - const newlyModified = result.databaseWriteResult.nModified || 0 - dbTotalImported = dbTotalImported + newlyImported - dbTotalModified = dbTotalModified + newlyModified - } - } - return { - imported: dbTotalImported, - modified: dbTotalModified, - total: dbTotalImported + dbTotalModified - } -} +// const getCurrentImportTotal = (results) => { +// const len = results.length +// let dbTotalImported = 0 +// let dbTotalModified = 0 +// for (let i = 0; i < len; i++) { +// const result = results[i] +// if (result && result.databaseWriteResult) { +// const newlyImported = result.databaseWriteResult.nUpserted || 0 +// const newlyModified = result.databaseWriteResult.nModified || 0 +// dbTotalImported = dbTotalImported + newlyImported +// dbTotalModified = dbTotalModified + newlyModified +// } +// } +// return { +// imported: dbTotalImported, +// modified: dbTotalModified, +// total: dbTotalImported + dbTotalModified +// } +// } // --------------------------------------------------- performCidBatchOperations -const performCidBatchOperations = (manifest, batchNo, tasks, results, retryQueue, inRetryLoop) => { - try { - if (inRetryLoop) { - console.log('The following batch previously failed and is being retried:') - } - const batchSize = argv.pagesize || 1000 - const batch = manifest.slice(0, batchSize) - tasks.push( - Pool.exec('processManifestBatch', [batch, batchNo]).then(async (result) => { - if (!result || !result.databaseWriteResult || !result.batchBackupResult) { - throw new Error(`An issue occured with batch ${batchNo}`) - } - results.push(result) - const imported = `${result.databaseWriteResult.nUpserted + result.databaseWriteResult.nModified} CIDs were imported to the db in this batch` - const activeTasks = `${Pool.stats().activeTasks} batches currently being processed` - const pendingTasks = `${Pool.stats().pendingTasks} pending batches remaining` - const currentTotals = await getCurrentImportTotal(results) - console.log(`Batch ${result.batchNo} finished | ${imported} | ${result.batchBackupResult.message} | ${activeTasks} | ${pendingTasks} | ${currentTotals.total} of ${manifestLength} CIDs completed`) - }).catch((e) => { - retryQueue.push(batch) - console.log('================================ Error returned by worker') - console.error(e) - }) - ) - manifest.splice(0, batchSize) - if (manifest.length) { - performCidBatchOperations(manifest, ++batchNo, tasks, results, retryQueue, inRetryLoop) - } else if (retryQueue.length) { - performCidBatchOperations(retryQueue, 1, tasks, results, [], true) - } else { - Promise.all(tasks).catch((e) => { - console.log('=========================== Error returned by worker pool') - console.log(e) - process.exit(0) - }).then(async () => { - const len = results.length - const endTime = process.hrtime()[0] - const finalResults = await getCurrentImportTotal(results) - console.log(`📒 CID import finished | took ${SecondsToHms(endTime - startTime)}`) - console.log(`A total of ${finalResults.total} CIDs were processed by the database | ${finalResults.imported} imported | ${finalResults.modified} modified` | `${finalResults.total} total CIDs successfully backed up`) - console.log(`${manifestLength - finalResults.total} CIDs either already existed in the database or could not be retrieved and have been cached for the next import.`) - Pool.terminate() - process.exit(0) - }) - } - } catch (e) { - console.log('======================= [Function: performCidBatchOperations]') - console.log(e) - } -} +// const performCidBatchOperations = (manifest, batchNo, tasks, results, retryQueue, inRetryLoop) => { +// try { +// if (inRetryLoop) { +// console.log('The following batch previously failed and is being retried:') +// } +// const batchSize = argv.pagesize || 1000 +// const batch = manifest.slice(0, batchSize) +// tasks.push( +// Pool.exec('processManifestBatch', [batch, batchNo]).then(async (result) => { +// if (!result || !result.databaseWriteResult || !result.batchBackupResult) { +// throw new Error(`An issue occured with batch ${batchNo}`) +// } +// results.push(result) +// const imported = `${result.databaseWriteResult.nUpserted + result.databaseWriteResult.nModified} CIDs were imported to the db in this batch` +// const activeTasks = `${Pool.stats().activeTasks} batches currently being processed` +// const pendingTasks = `${Pool.stats().pendingTasks} pending batches remaining` +// const currentTotals = await getCurrentImportTotal(results) +// console.log(`Batch ${result.batchNo} finished | ${imported} | ${result.batchBackupResult.message} | ${activeTasks} | ${pendingTasks} | ${currentTotals.total} of ${manifestLength} CIDs completed`) +// }).catch((e) => { +// retryQueue.push(batch) +// console.log('================================ Error returned by worker') +// console.error(e) +// }) +// ) +// manifest.splice(0, batchSize) +// if (manifest.length) { +// performCidBatchOperations(manifest, ++batchNo, tasks, results, retryQueue, inRetryLoop) +// } else if (retryQueue.length) { +// performCidBatchOperations(retryQueue, 1, tasks, results, [], true) +// } else { +// Promise.all(tasks).catch((e) => { +// console.log('=========================== Error returned by worker pool') +// console.log(e) +// process.exit(0) +// }).then(async () => { +// const len = results.length +// const endTime = process.hrtime()[0] +// const finalResults = await getCurrentImportTotal(results) +// console.log(`📒 CID import finished | took ${SecondsToHms(endTime - startTime)}`) +// console.log(`A total of ${finalResults.total} CIDs were processed by the database | ${finalResults.imported} imported | ${finalResults.modified} modified` | `${finalResults.total} total CIDs successfully backed up`) +// console.log(`${manifestLength - finalResults.total} CIDs either already existed in the database or could not be retrieved and have been cached for the next import.`) +// Pool.terminate() +// process.exit(0) +// }) +// } +// } catch (e) { +// console.log('======================= [Function: performCidBatchOperations]') +// console.log(e) +// } +// } // ------------------------------------------------- getCidFilesFromManifestList const getCidFilesFromManifestList = async () => { diff --git a/packages/be/crons/worker-pool-batch-processor.js b/packages/be/crons/worker-pool-batch-processor.js new file mode 100644 index 00000000..630d28c6 --- /dev/null +++ b/packages/be/crons/worker-pool-batch-processor.js @@ -0,0 +1,95 @@ +let startTime +let manifestLength = 0 + +const logBatchResults = (result, options, batchNo) { + if (!result) { + throw new Error(`An issue occured with batch ${batchNo}`) + } + if (Array.isArray(options.batchResultKeys) && typeof result === 'object') { + let batchResults = '' + options.batchResultKeys.forEach((key) => { + if (result.hasOwnProperty(key)) { + batchResults = batchResults + `${key}: ${result[key]}. ` + } + }) + } + const activeTasks = `${Pool.stats().activeTasks} batches currently being processed` + const pendingTasks = `${Pool.stats().pendingTasks} pending batches remaining` + console.log(`Batch ${batchNo} finished | ${activeTasks} | ${pendingTasks} | ${options.batchResultsMessage} ${batchResults}`) +} + +const logFinalResults = (results, manifestLength) => { + // const finalResults = await getCurrentImportTotal(results) + console.log(`📒 CID import finished | took ${SecondsToHms(endTime - startTime)}`) + console.log(`A total of ${finalResults.total} CIDs were processed by the database | ${finalResults.imported} imported | ${finalResults.modified} modified` | `${finalResults.total} total CIDs successfully backed up`) + console.log(`${manifestLength - finalResults.total} CIDs either already existed in the database or could not be retrieved and have been cached for the next import.`) +} + +const scheduleBatchOperation = ( + manifest, + operation, + options = {}, + batchNo = 1, + tasks = [], + results = [], + retryQueue = [], + inRetryLoop = false, + retry = false +) => { + try { + if (batchNo === 1) { + startTime = process.hrtime()[0] + manifestLength = manifest.length + } + if (inRetryLoop) { + console.log('The following batch previously failed and is being retried:') + } + const batchSize = options.batchSize + const batch = manifest.slice(0, batchSize) + tasks.push( + Pool.exec(operation, [batch, batchNo]).then(async (result) => { + logBatchResults(result, options, batchNo) + results.push(result) + }).catch((e) => { + if (retry) { retryQueue.push(batch) } + console.log('================================ Error returned by worker') + console.error(e) + }) + ) + // remove batch from the manifest list + manifest.splice(0, batchSize) + // If there are items remaining in the manifest list, schedule a next batch + // If no items remain but retry is enabled and some tasks failed, schedule retries + // Otherwise proceed with compiling final results + if (manifest.length) { + scheduleBatchOperation(manifest, operation, options, ++batchNo, tasks, results, retryQueue, inRetryLoop, retry) + } else if (retryQueue.length) { + scheduleBatchOperation(retryQueue, operation, options, 1, tasks, results, [], true) + } else { + Promise.all(tasks).catch((e) => { + console.log('=========================== Error returned by worker pool') + console.log(e) + process.exit(0) + }).then(async () => { + logFinalResults(results, manifestLength) + const endTime = process.hrtime()[0] + Pool.terminate() + process.exit(0) + }) + } + } catch (e) { + console.log('======================= [Function: performCidBatchOperations]') + console.log(e) + } +} + +// ================================================================== Initialize +// ----------------------------------------------------------------------------- +const initializeWorkerPool = async (manifest, options) => { + try { + scheduleBatchOperation(manifest, options) + } catch (e) { + console.log('=========================== [Function: Initialize Worker Pool') + console.log(e) + } +} From 21c39e05281e286715680149539e9518bdea5ed2 Mon Sep 17 00:00:00 2001 From: svvimming Date: Tue, 30 May 2023 22:37:41 -0400 Subject: [PATCH 5/8] feat: abstract worker pool setup to separate script --- packages/be/crons/cid-batch-import.js | 3 +- packages/be/crons/cid-importer.js | 127 +++++++----------- .../be/crons/worker-pool-batch-processor.js | 93 +++++++------ 3 files changed, 100 insertions(+), 123 deletions(-) diff --git a/packages/be/crons/cid-batch-import.js b/packages/be/crons/cid-batch-import.js index 295ebba7..a65445bb 100644 --- a/packages/be/crons/cid-batch-import.js +++ b/packages/be/crons/cid-batch-import.js @@ -205,7 +205,6 @@ const backupCidsToBackblazeBucket = async (batchNo) => { } } -// ////////////////////////////////////////////////////////////////////// Worker // -------------------------------------------------------- processManifestBatch const processManifestBatch = async (batch, batchNo) => { try { @@ -255,7 +254,7 @@ const processManifestBatch = async (batch, batchNo) => { } } -// ////////////////////////////////////////////////////////////////// Initialize +// /////////////////////////////////////////////////////////// Initialize Worker // ----------------------------------------------------------------------------- WorkerPool.worker({ processManifestBatch: processManifestBatch diff --git a/packages/be/crons/cid-importer.js b/packages/be/crons/cid-importer.js index 1d39ce76..23502f5e 100644 --- a/packages/be/crons/cid-importer.js +++ b/packages/be/crons/cid-importer.js @@ -18,7 +18,8 @@ const Stream = require('stream') const Pipeline = Util.promisify(Stream.pipeline) const readline = require('node:readline') const argv = require('minimist')(process.argv.slice(2)) -const WorkerPool = require('workerpool') + +const { CreateWorkerPool } = require('./worker-pool-batch-processor.js') require('dotenv').config({ path: Path.resolve(__dirname, '../.env') }) @@ -26,12 +27,6 @@ const MC = require('../config') const CID_TMP_DIR = Path.resolve(`${MC.packageRoot}/tmp/cid-files`) -const numThreads = argv.threads || 16 -const Pool = WorkerPool.pool(Path.resolve(__dirname, './cid-batch-import.js'), { - maxWorkers: numThreads, - workerType: 'thread' -}) -let startTime let manifestLength = 0 // ////////////////////////////////////////////////////////////////// Initialize @@ -63,78 +58,44 @@ require('@Module_Cidtemp') const { SecondsToHms } = require('@Module_Utilities') // /////////////////////////////////////////////////////////////////// Functions -// ------------------------------------------------------- getCurrentImportTotal -// const getCurrentImportTotal = (results) => { -// const len = results.length -// let dbTotalImported = 0 -// let dbTotalModified = 0 -// for (let i = 0; i < len; i++) { -// const result = results[i] -// if (result && result.databaseWriteResult) { -// const newlyImported = result.databaseWriteResult.nUpserted || 0 -// const newlyModified = result.databaseWriteResult.nModified || 0 -// dbTotalImported = dbTotalImported + newlyImported -// dbTotalModified = dbTotalModified + newlyModified -// } -// } -// return { -// imported: dbTotalImported, -// modified: dbTotalModified, -// total: dbTotalImported + dbTotalModified -// } -// } +// ------------------------------------------------------ logCurrentImportTotals +const logCurrentImportTotals = (currentResult, batchNo, resultsToDate) => { + const len = resultsToDate.length + let dbTotalImported = 0 + let dbTotalModified = 0 + for (let i = 0; i < len; i++) { + const result = resultsToDate[i] + if (result && result.databaseWriteResult) { + const newlyImported = result.databaseWriteResult.nUpserted || 0 + const newlyModified = result.databaseWriteResult.nModified || 0 + dbTotalImported = dbTotalImported + newlyImported + dbTotalModified = dbTotalModified + newlyModified + } + } + if ( + typeof currentResult === 'object' && + typeof currentResult.databaseWriteResult === 'object' && + typeof currentResult.batchBackupResult === 'object' + ) { + const currentImportStats = `${currentResult.databaseWriteResult.nUpserted + currentResult.databaseWriteResult.nModified} CIDs were imported to the db in this batch` + console.log(` ${currentImportStats} | ${currentResult.batchBackupResult.message} | ${dbTotalImported + dbTotalModified} of ${manifestLength} CIDs completed`) + } +} -// --------------------------------------------------- performCidBatchOperations -// const performCidBatchOperations = (manifest, batchNo, tasks, results, retryQueue, inRetryLoop) => { -// try { -// if (inRetryLoop) { -// console.log('The following batch previously failed and is being retried:') -// } -// const batchSize = argv.pagesize || 1000 -// const batch = manifest.slice(0, batchSize) -// tasks.push( -// Pool.exec('processManifestBatch', [batch, batchNo]).then(async (result) => { -// if (!result || !result.databaseWriteResult || !result.batchBackupResult) { -// throw new Error(`An issue occured with batch ${batchNo}`) -// } -// results.push(result) -// const imported = `${result.databaseWriteResult.nUpserted + result.databaseWriteResult.nModified} CIDs were imported to the db in this batch` -// const activeTasks = `${Pool.stats().activeTasks} batches currently being processed` -// const pendingTasks = `${Pool.stats().pendingTasks} pending batches remaining` -// const currentTotals = await getCurrentImportTotal(results) -// console.log(`Batch ${result.batchNo} finished | ${imported} | ${result.batchBackupResult.message} | ${activeTasks} | ${pendingTasks} | ${currentTotals.total} of ${manifestLength} CIDs completed`) -// }).catch((e) => { -// retryQueue.push(batch) -// console.log('================================ Error returned by worker') -// console.error(e) -// }) -// ) -// manifest.splice(0, batchSize) -// if (manifest.length) { -// performCidBatchOperations(manifest, ++batchNo, tasks, results, retryQueue, inRetryLoop) -// } else if (retryQueue.length) { -// performCidBatchOperations(retryQueue, 1, tasks, results, [], true) -// } else { -// Promise.all(tasks).catch((e) => { -// console.log('=========================== Error returned by worker pool') -// console.log(e) -// process.exit(0) -// }).then(async () => { -// const len = results.length -// const endTime = process.hrtime()[0] -// const finalResults = await getCurrentImportTotal(results) -// console.log(`📒 CID import finished | took ${SecondsToHms(endTime - startTime)}`) -// console.log(`A total of ${finalResults.total} CIDs were processed by the database | ${finalResults.imported} imported | ${finalResults.modified} modified` | `${finalResults.total} total CIDs successfully backed up`) -// console.log(`${manifestLength - finalResults.total} CIDs either already existed in the database or could not be retrieved and have been cached for the next import.`) -// Pool.terminate() -// process.exit(0) -// }) -// } -// } catch (e) { -// console.log('======================= [Function: performCidBatchOperations]') -// console.log(e) -// } -// } +const logFinalImportResults = (results, errors) => { + console.log(`📒 CID import & backup finished | ${results.length} total batches processed`) + const len = errors.length + failedCids = [] + for (let i = 0; i < len; i++) { + const error = errors[i] + failedCids.push(error.batch) + } + failedCids.flat() + if (failedCids.length) { + console.log(`${failedCids.length} CID imports/backups were unsuccessful:`) + console.log(failedCids) + } +} // ------------------------------------------------- getCidFilesFromManifestList const getCidFilesFromManifestList = async () => { @@ -153,7 +114,13 @@ const getCidFilesFromManifestList = async () => { manifest.reverse() manifestLength = manifest.length // pass manifest on to the worker pool - performCidBatchOperations(manifest, 1, [], [], [], false) + const options = { + threads: argv.threads, + batchSize: argv.pagesize || 1000, + onBatchComplete: logCurrentImportTotals, + onWorkerPoolComplete: logFinalImportResults + } + CreateWorkerPool(Path.resolve(__dirname, './cid-batch-import.js'), 'processManifestBatch', manifest, options) } catch (e) { console.log('===================== [Function: getCidFilesFromManifestList]') console.log(e) @@ -248,6 +215,8 @@ const CidImporter = async () => { * limit number of entries to the database (this will only be used for test) */ await getCidFilesFromManifestList() + const endTime = process.hrtime()[0] + console.log(`CID manifest took ${SecondsToHms(endTime - startTime)}`) } catch (e) { console.log('===================================== [Function: CidImporter]') console.log(e) diff --git a/packages/be/crons/worker-pool-batch-processor.js b/packages/be/crons/worker-pool-batch-processor.js index 630d28c6..695f1ea6 100644 --- a/packages/be/crons/worker-pool-batch-processor.js +++ b/packages/be/crons/worker-pool-batch-processor.js @@ -1,78 +1,80 @@ +// ///////////////////////////////////////////////////// Imports + general setup +// ----------------------------------------------------------------------------- +const WorkerPool = require('workerpool') + let startTime -let manifestLength = 0 -const logBatchResults = (result, options, batchNo) { - if (!result) { - throw new Error(`An issue occured with batch ${batchNo}`) - } - if (Array.isArray(options.batchResultKeys) && typeof result === 'object') { - let batchResults = '' - options.batchResultKeys.forEach((key) => { - if (result.hasOwnProperty(key)) { - batchResults = batchResults + `${key}: ${result[key]}. ` - } - }) - } - const activeTasks = `${Pool.stats().activeTasks} batches currently being processed` - const pendingTasks = `${Pool.stats().pendingTasks} pending batches remaining` - console.log(`Batch ${batchNo} finished | ${activeTasks} | ${pendingTasks} | ${options.batchResultsMessage} ${batchResults}`) +// /////////////////////////////////////////////////////////////////// Functions +// ------------------------------------ Convert Seconds To Hours|Minutes|Seconds +const SecondsToHms = (seconds) => { + const input = Number(seconds) + const h = Math.floor(input / 3600) + const m = Math.floor(input % 3600 / 60) + const s = Math.floor(input % 3600 % 60) + const hDisplay = h > 0 ? h + (h === 1 ? ' hour, ' : ' hours, ') : '' + const mDisplay = m > 0 ? m + (m === 1 ? ' minute, ' : ' minutes, ') : '' + const sDisplay = s > 0 ? s + (s === 1 ? ' second' : ' seconds') : '' + return hDisplay + mDisplay + sDisplay } -const logFinalResults = (results, manifestLength) => { - // const finalResults = await getCurrentImportTotal(results) - console.log(`📒 CID import finished | took ${SecondsToHms(endTime - startTime)}`) - console.log(`A total of ${finalResults.total} CIDs were processed by the database | ${finalResults.imported} imported | ${finalResults.modified} modified` | `${finalResults.total} total CIDs successfully backed up`) - console.log(`${manifestLength - finalResults.total} CIDs either already existed in the database or could not be retrieved and have been cached for the next import.`) +// ---------------------------------------------------------------- LogPoolStats +const logPoolStatus = (batchNo) => { + const activeTasks = `${Pool.stats().activeTasks} batches currently being processed` + const pendingTasks = `${Pool.stats().pendingTasks} pending batches remaining` + console.log(`Batch ${batchNo} finished | ${activeTasks} | ${pendingTasks}`) } +// ----------------------------------------------------- ScheduleBatchOperations const scheduleBatchOperation = ( + Pool, manifest, operation, options = {}, batchNo = 1, tasks = [], results = [], - retryQueue = [], - inRetryLoop = false, - retry = false + errors = [] ) => { try { - if (batchNo === 1) { + if (batchNo === 1) { startTime = process.hrtime()[0] - manifestLength = manifest.length + console.log('🤖 Worker pool started') } - if (inRetryLoop) { - console.log('The following batch previously failed and is being retried:') - } - const batchSize = options.batchSize + const batchSize = options.batchSize || 10 const batch = manifest.slice(0, batchSize) tasks.push( Pool.exec(operation, [batch, batchNo]).then(async (result) => { - logBatchResults(result, options, batchNo) + if (!result) { + throw new Error(`An issue occured with batch ${batchNo}`) + } results.push(result) + logPoolStatus(batchNo) + if (typeof options.onBatchComplete === 'function') { + options.onBatchComplete(result, batchNo, results) + } }).catch((e) => { - if (retry) { retryQueue.push(batch) } - console.log('================================ Error returned by worker') + errors.push({ batchNo, batch, error: e }) + console.log(`Error returned by worker: Could not process batch ${batchNo}.`) console.error(e) }) ) // remove batch from the manifest list manifest.splice(0, batchSize) // If there are items remaining in the manifest list, schedule a next batch - // If no items remain but retry is enabled and some tasks failed, schedule retries // Otherwise proceed with compiling final results if (manifest.length) { - scheduleBatchOperation(manifest, operation, options, ++batchNo, tasks, results, retryQueue, inRetryLoop, retry) - } else if (retryQueue.length) { - scheduleBatchOperation(retryQueue, operation, options, 1, tasks, results, [], true) + scheduleBatchOperation(Pool, manifest, operation, options, ++batchNo, tasks, results, errors) } else { Promise.all(tasks).catch((e) => { console.log('=========================== Error returned by worker pool') console.log(e) process.exit(0) }).then(async () => { - logFinalResults(results, manifestLength) const endTime = process.hrtime()[0] + console.log(`✅ Worker pool task list complete | took ${SecondsToHms(endTime - startTime)}`) + if (typeof options.onWorkerPoolComplete === 'function') { + options.onWorkerPoolComplete(results, errors) + } Pool.terminate() process.exit(0) }) @@ -83,13 +85,20 @@ const scheduleBatchOperation = ( } } -// ================================================================== Initialize -// ----------------------------------------------------------------------------- -const initializeWorkerPool = async (manifest, options) => { +// ------------------------------------------------------------ CreateWorkerPool +const CreateWorkerPool = async (pathToScript, operation, manifest, options) => { try { - scheduleBatchOperation(manifest, options) + const Pool = WorkerPool.pool(pathToScript, { + maxWorkers: options.threads || 15, + workerType: 'thread' + }) + scheduleBatchOperation(Pool, manifest, operation, options) } catch (e) { console.log('=========================== [Function: Initialize Worker Pool') console.log(e) } } + +module.exports = { + CreateWorkerPool +} From 9a4dd06eb8ec02e859b26ecc6d44c0f45587cecf Mon Sep 17 00:00:00 2001 From: svvimming Date: Wed, 31 May 2023 15:56:08 -0400 Subject: [PATCH 6/8] feat: node worker pool management abstracted into separate script --- packages/be/crons/cid-batch-import.js | 19 ++++++++++------- packages/be/crons/cid-importer.js | 9 ++++---- .../be/crons/worker-pool-batch-processor.js | 21 +++++++++++-------- 3 files changed, 27 insertions(+), 22 deletions(-) diff --git a/packages/be/crons/cid-batch-import.js b/packages/be/crons/cid-batch-import.js index a65445bb..e744836b 100644 --- a/packages/be/crons/cid-batch-import.js +++ b/packages/be/crons/cid-batch-import.js @@ -235,17 +235,20 @@ const processManifestBatch = async (batch, batchNo) => { Fs.rm(`${CID_TMP_DIR}/batch_${batchNo}`, { recursive: true, force: true }) } } + let result = undefined + if (databaseWriteResult && batchBackupResult) { + result = { + batchNo: batchNo, + databaseWriteResult: databaseWriteResult, + batchBackupResult: batchBackupResult + } + } // return results to the main thread return await new Promise((resolve, reject) => { - if (!databaseWriteResult || !batchBackupResult) { - reject() + if (result) { + resolve(new WorkerPool.Transfer(result)) } else { - const result = new WorkerPool.Transfer({ - batchNo: batchNo, - databaseWriteResult: databaseWriteResult, - batchBackupResult: batchBackupResult - }) - resolve(result) + reject() } }) } catch (e) { diff --git a/packages/be/crons/cid-importer.js b/packages/be/crons/cid-importer.js index 23502f5e..204c9271 100644 --- a/packages/be/crons/cid-importer.js +++ b/packages/be/crons/cid-importer.js @@ -18,7 +18,6 @@ const Stream = require('stream') const Pipeline = Util.promisify(Stream.pipeline) const readline = require('node:readline') const argv = require('minimist')(process.argv.slice(2)) - const { CreateWorkerPool } = require('./worker-pool-batch-processor.js') require('dotenv').config({ path: Path.resolve(__dirname, '../.env') }) @@ -85,12 +84,12 @@ const logCurrentImportTotals = (currentResult, batchNo, resultsToDate) => { const logFinalImportResults = (results, errors) => { console.log(`📒 CID import & backup finished | ${results.length} total batches processed`) const len = errors.length - failedCids = [] + failedBatches = [] for (let i = 0; i < len; i++) { const error = errors[i] - failedCids.push(error.batch) + failedBatches.push(error.batch) } - failedCids.flat() + const failedCids = failedBatches.flat() if (failedCids.length) { console.log(`${failedCids.length} CID imports/backups were unsuccessful:`) console.log(failedCids) @@ -117,7 +116,7 @@ const getCidFilesFromManifestList = async () => { const options = { threads: argv.threads, batchSize: argv.pagesize || 1000, - onBatchComplete: logCurrentImportTotals, + onBatchResult: logCurrentImportTotals, onWorkerPoolComplete: logFinalImportResults } CreateWorkerPool(Path.resolve(__dirname, './cid-batch-import.js'), 'processManifestBatch', manifest, options) diff --git a/packages/be/crons/worker-pool-batch-processor.js b/packages/be/crons/worker-pool-batch-processor.js index 695f1ea6..5bae7e4e 100644 --- a/packages/be/crons/worker-pool-batch-processor.js +++ b/packages/be/crons/worker-pool-batch-processor.js @@ -1,5 +1,6 @@ // ///////////////////////////////////////////////////// Imports + general setup // ----------------------------------------------------------------------------- +const Fs = require('fs-extra') const WorkerPool = require('workerpool') let startTime @@ -18,10 +19,10 @@ const SecondsToHms = (seconds) => { } // ---------------------------------------------------------------- LogPoolStats -const logPoolStatus = (batchNo) => { +const logPoolStatus = (Pool, num) => { const activeTasks = `${Pool.stats().activeTasks} batches currently being processed` const pendingTasks = `${Pool.stats().pendingTasks} pending batches remaining` - console.log(`Batch ${batchNo} finished | ${activeTasks} | ${pendingTasks}`) + console.log(`Batch ${num} finished | ${activeTasks} | ${pendingTasks}`) } // ----------------------------------------------------- ScheduleBatchOperations @@ -42,19 +43,20 @@ const scheduleBatchOperation = ( } const batchSize = options.batchSize || 10 const batch = manifest.slice(0, batchSize) + const num = batchNo tasks.push( - Pool.exec(operation, [batch, batchNo]).then(async (result) => { + Pool.exec(operation, [batch, num]).then(async (result) => { if (!result) { - throw new Error(`An issue occured with batch ${batchNo}`) + throw new Error(`An issue occured with batch ${num}`) } results.push(result) - logPoolStatus(batchNo) - if (typeof options.onBatchComplete === 'function') { - options.onBatchComplete(result, batchNo, results) + logPoolStatus(Pool, num) + if (typeof options.onBatchResult === 'function') { + options.onBatchResult(result, num, results) } }).catch((e) => { - errors.push({ batchNo, batch, error: e }) - console.log(`Error returned by worker: Could not process batch ${batchNo}.`) + errors.push({ num, batch, error: e }) + console.log(`Error returned by worker: Could not process batch ${num}.`) console.error(e) }) ) @@ -99,6 +101,7 @@ const CreateWorkerPool = async (pathToScript, operation, manifest, options) => { } } +// --------------------------------------------------------------------- Exports module.exports = { CreateWorkerPool } From c8b5c758e565547b4e309124cf71f83920150344 Mon Sep 17 00:00:00 2001 From: svvimming Date: Fri, 2 Jun 2023 13:37:19 -0400 Subject: [PATCH 7/8] feat: documentation of abstracted worker pool module --- .gitignore | 3 + packages/be/crons/cid-batch-import.js | 11 +- packages/be/crons/cid-importer.js | 3 +- .../open-panda-dataset-meta-bk__filter.txt | 5 - packages/be/crons/x_cid-importer.js | 322 ------------------ .../worker-pool-batch-processor.js | 12 +- .../be/scripts/worker-pool-batch-processor.md | 43 +++ 7 files changed, 60 insertions(+), 339 deletions(-) delete mode 100644 packages/be/crons/open-panda-dataset-meta-bk__filter.txt delete mode 100644 packages/be/crons/x_cid-importer.js rename packages/be/{crons => scripts}/worker-pool-batch-processor.js (93%) create mode 100644 packages/be/scripts/worker-pool-batch-processor.md diff --git a/.gitignore b/.gitignore index 0163acba..2716bcc3 100644 --- a/.gitignore +++ b/.gitignore @@ -107,3 +107,6 @@ sw.* # Storybook .nuxt-storybook storybook-static + +# rclone filter rules +open-panda-dataset-meta-bk__filter.txt diff --git a/packages/be/crons/cid-batch-import.js b/packages/be/crons/cid-batch-import.js index e744836b..8fc989a8 100644 --- a/packages/be/crons/cid-batch-import.js +++ b/packages/be/crons/cid-batch-import.js @@ -9,7 +9,6 @@ const Util = require('util') const Stream = require('stream') const Pipeline = Util.promisify(Stream.pipeline) const Spawn = require('child_process').spawn -const WorkerPool = require('workerpool') const Mongoose = require('mongoose') const MongooseSlugUpdater = require('mongoose-slug-updater') @@ -47,6 +46,7 @@ try { require('@Module_Database') require('@Module_Cidtemp') const { GetFileFromDisk } = require('@Module_Utilities') +const { InitializeWorker } = require('@Root/scripts/worker-pool-batch-processor.js') // /////////////////////////////////////////////////////////////////// Functions // ------------------------------------------------------------ retrieveCidFiles @@ -180,8 +180,7 @@ const backupCidsToBackblazeBucket = async (batchNo) => { `${MC.tmpRoot}/cid-files/batch_${batchNo}`, process.env.B2_OPENPANDA_BUCKET, '--filter-from', - `${MC.packageRoot}/crons/open-panda-dataset-meta-bk__filter.txt` - // process.env.B2_OPENPANDA_FILTER + process.env.B2_OPENPANDA_FILTER ]) const errors = [] for await (const msg of rclone.stderr) { @@ -246,7 +245,7 @@ const processManifestBatch = async (batch, batchNo) => { // return results to the main thread return await new Promise((resolve, reject) => { if (result) { - resolve(new WorkerPool.Transfer(result)) + resolve(result) } else { reject() } @@ -259,6 +258,4 @@ const processManifestBatch = async (batch, batchNo) => { // /////////////////////////////////////////////////////////// Initialize Worker // ----------------------------------------------------------------------------- -WorkerPool.worker({ - processManifestBatch: processManifestBatch -}) +InitializeWorker(processManifestBatch) diff --git a/packages/be/crons/cid-importer.js b/packages/be/crons/cid-importer.js index 204c9271..bd655e3b 100644 --- a/packages/be/crons/cid-importer.js +++ b/packages/be/crons/cid-importer.js @@ -18,7 +18,6 @@ const Stream = require('stream') const Pipeline = Util.promisify(Stream.pipeline) const readline = require('node:readline') const argv = require('minimist')(process.argv.slice(2)) -const { CreateWorkerPool } = require('./worker-pool-batch-processor.js') require('dotenv').config({ path: Path.resolve(__dirname, '../.env') }) @@ -55,6 +54,7 @@ try { require('@Module_Database') require('@Module_Cidtemp') const { SecondsToHms } = require('@Module_Utilities') +const { CreateWorkerPool } = require('@Root/scripts/worker-pool-batch-processor.js') // /////////////////////////////////////////////////////////////////// Functions // ------------------------------------------------------ logCurrentImportTotals @@ -191,7 +191,6 @@ const CidImporter = async () => { } const lastSavedDate = mostRecentDocument ? new Date(mostRecentDocument.web3storageCreatedAt).getTime() : 0 // Delete the outdated manifest file if it exists - // await deleteTemporaryFile('cid-manifest.txt') if (Fs.existsSync(`${CID_TMP_DIR}/cid-manifest.txt`)) { Fs.unlinkSync(`${CID_TMP_DIR}/cid-manifest.txt`) } diff --git a/packages/be/crons/open-panda-dataset-meta-bk__filter.txt b/packages/be/crons/open-panda-dataset-meta-bk__filter.txt deleted file mode 100644 index 1e854a9d..00000000 --- a/packages/be/crons/open-panda-dataset-meta-bk__filter.txt +++ /dev/null @@ -1,5 +0,0 @@ -# Filtering rules (Arrange the order of filter rules with the most restrictive first and work down.) -+ *.zst - -# Exclude everything else -- ** diff --git a/packages/be/crons/x_cid-importer.js b/packages/be/crons/x_cid-importer.js deleted file mode 100644 index 9721378f..00000000 --- a/packages/be/crons/x_cid-importer.js +++ /dev/null @@ -1,322 +0,0 @@ -/** - * - * ⏱️️ [Cron | weekly] CID Importer - * - * Caches CID data - * - */ - -// ///////////////////////////////////////////////////// Imports + general setup -// ----------------------------------------------------------------------------- -const ModuleAlias = require('module-alias') -const Path = require('path') -const Axios = require('axios') -const Fs = require('fs-extra') -const Express = require('express') -const Util = require('util') -const Stream = require('stream') -const Pipeline = Util.promisify(Stream.pipeline) -const readline = require('node:readline') -const Spawn = require('child_process').spawn -const argv = require('minimist')(process.argv.slice(2)) -require('dotenv').config({ path: Path.resolve(__dirname, '../.env') }) - -const MC = require('../config') - -const CID_TMP_DIR = Path.resolve(`${MC.packageRoot}/tmp/cid-files`) - -// ////////////////////////////////////////////////////////////////// Initialize -MC.app = Express() - -// ///////////////////////////////////////////////////////////////////// Aliases -ModuleAlias.addAliases({ - '@Root': MC.packageRoot, - '@Modules': `${MC.packageRoot}/modules` -}) - -try { - const modulesRoot = `${MC.packageRoot}/modules` - const items = Fs.readdirSync(modulesRoot) - items.forEach((name) => { - const path = `${modulesRoot}/${name}` - if (Fs.statSync(path).isDirectory()) { - const moduleName = `${name[0].toUpperCase() + name.substring(1)}` - ModuleAlias.addAlias(`@Module_${moduleName}`, path) - } - }) -} catch (e) { - console.log(e) -} - -// ///////////////////////////////////////////////////////////////////// Modules -require('@Module_Database') -require('@Module_Cidtemp') -const { GetFileFromDisk, SecondsToHms } = require('@Module_Utilities') - -// /////////////////////////////////////////////////////////////////// Functions -// ------------------------------------------------------------ retrieveCidFiles -const retrieveCidFile = async (line, retryNo = 0) => { - try { - if (retryNo > 0) { - console.log(`Retry number ${retryNo}`) - } - const upload = JSON.parse(line) - // fetch file using upload cid - const response = await Axios.get(`https://${upload.cid}.ipfs.w3s.link/`, { - responseType: 'stream' - }) - // if a file already exists with this name in the temp folder, - // delete it to make way for an updated version - await deleteTemporaryFile(upload.name) - // write file data to new zst file in the temp folder - await Pipeline(response.data, Fs.createWriteStream(`${CID_TMP_DIR}/${upload.name}`)) - // unpack the zst and return the inner json - return await unpackRetrievedFile({ - cid: upload.cid, - name: upload.name, - updated: upload.updated, - created: upload.created - }) - } catch (e) { - console.log('====================================== [Function: unpackCids]') - console.log(e) - if (retryNo < 10) { - console.log(`Error retrieving CID ${JSON.parse(line).cid}. Retrying retrieval...`) - await retrieveCidFile(line, retryNo + 1) - } else { - const cid = JSON.parse(line).cid - console.log(`Could not retrieve CID ${cid}. Max retries reached.`) - await cacheFailedCID(cid) - } - } -} - -// --------------------------------------------------------------- unpackZstFile -const unpackZstFile = (file) => { - return new Promise((resolve, reject) => { - const unzstd = Spawn('unzstd', [`../tmp/cid-files/${file.name}`]) - const errors = [] - unzstd.stderr.on('data', (msg) => { - errors.push(`Error unpacking ${file.name}: ${msg.toString()}`) - }) - unzstd.on('exit', (code) => { - const err = errors.length > 0 && code !== 0 - if (err) { - console.log(errors.join('\n')) - } - resolve() - }) - }) -} - -// --------------------------------------------------------- unpackRetrievedFile -const unpackRetrievedFile = async (file) => { - try { - const jsonFilename = file.name.substring(0, file.name.length - 4) - await deleteTemporaryFile(jsonFilename) - await unpackZstFile(file) - await deleteTemporaryFile(file.name) - const json = await GetFileFromDisk(`${CID_TMP_DIR}/${jsonFilename}`, true) - const fileMetadata = { - piece_cid: json.piece_cid, - payload_cid: json.payload_cid, - raw_car_file_size: json.raw_car_file_size, - dataset_slug: json.dataset, - filename: jsonFilename, - web3storageCreatedAt: file.created, - web3storageUpdatedAt: file.updated - } - await deleteTemporaryFile(jsonFilename) - return fileMetadata - } catch (e) { - console.log('============================ [Function: unpackRetrievedFiles]') - console.log(e) - } -} - -// ------------------------------------------------- writeFileMetadataToDatabase -const writeFileMetadataToDatabase = async (retrievedFiles) => { - try { - const operations = [] - const len = retrievedFiles.length - for (let i = 0; i < len; i++) { - const file = retrievedFiles[i] - operations.push({ - updateOne: { - filter: { payload_cid: file.payload_cid }, - update: { $set: file }, - upsert: true - } - }) - } - const response = await MC.model.Cidtemp.bulkWrite(operations, { ordered: false }) - return response.result - } catch (e) { - console.log('========================= [Function: writeCidFilesToDatabase]') - console.log(e) - } -} - -// -------------------------------------------------------------- cacheFailedCid -const cacheFailedCID = async (cid) => { - try { - await Pipeline(`${cid}\n`, Fs.createWriteStream(`${CID_TMP_DIR}/failed-cid-retrievals.txt`, { flags: 'a' })) - } catch (e) { - console.log('================================= [Function: cacheFailedCID ]') - console.log(e) - } -} - -// --------------------------------------------------------- deleteTemporaryFile -const deleteTemporaryFile = async (filename) => { - try { - if (Fs.existsSync(`${CID_TMP_DIR}/${filename}`)) { - Fs.unlinkSync(`${CID_TMP_DIR}/${filename}`) - } - } catch (e) { - console.log('============================ [Function: deleteTemporaryFile ]') - console.log(e) - } -} - -// ------------------------------------------------- getCidFilesFromManifestList -const getCidFilesFromManifestList = async (importMax) => { - try { - if (Fs.existsSync(`${CID_TMP_DIR}/cid-manifest.txt`)) { - const manifest = Fs.createReadStream(`${CID_TMP_DIR}/cid-manifest.txt`) - const rl = readline.createInterface({ - input: manifest, - crlfDelay: Infinity - }) - // import all lines from the manifest to an array - const manifestCidLines = [] - for await (const line of rl) { - manifestCidLines.push(line) - } - // reverse the array to import oldest CIDs first - // begin writing to the database in batches - manifestCidLines.reverse() - let retrievedFiles = [] - let total = 0 - const batchSize = argv.pagesize || 1000 - const len = manifestCidLines.length - for (let i = 0; i < len; i++) { - if (i < importMax) { - const line = manifestCidLines[i] - console.log(`Retrieving file ${i + 1} from the CID manifest list.`) - const retrieved = await retrieveCidFile(line) - if (retrieved) { retrievedFiles.push(retrieved) } - // write retrieved file data to the database in batches of 1000 - if ((i + 1) % batchSize === 0) { - const result = await writeFileMetadataToDatabase(retrievedFiles) - total = total + result.nUpserted + result.nModified - console.log(`${result.nUpserted} new CIDs imported in this batch | ${result.nModified} CIDs updated in this batch | A total of ${total} CIDs imported/updated so far.`) - retrievedFiles = [] - } - } else { - break - } - } - const result = await writeFileMetadataToDatabase(retrievedFiles) - console.log(`${result.nUpserted} new CIDs imported in this batch | ${result.nModified} CIDs updated in this batch | A total of ${total + result.nUpserted + result.nModified} CIDs were imported/updated to the database.`) - } - } catch (e) { - console.log('===================== [Function: getCidFilesFromManifestList]') - console.log(e) - } -} - -// ------------------------------------------- createManifestFromWeb3StorageCids -const createManifestFromWeb3StorageCids = async (searchParams, maxPages, lastSavedDate, count) => { - try { - const options = { headers: { Accept: 'application/json', Authorization: `Bearer ${process.env.WEB3STORAGE_TOKEN}` } } - const params = Object.keys(searchParams).map((item) => `${item}=${searchParams[item]}`).join('&') - const url = `https://api.web3.storage/user/uploads?${params}` - const response = await Axios.get(url, options) - const uploads = response.data - const len = uploads.length - let skipCount = 0 - let total = 0 - let lastSavedDateReached = false - for (let i = 0; i < len; i++) { - const upload = uploads[i] - const uploadDate = new Date(upload.created).getTime() - if (uploadDate > lastSavedDate) { - if (upload.name.endsWith('.zst')) { - const newline = JSON.stringify({ cid: upload.cid, name: upload.name, created: upload.created, updated: upload.updated }) - await Pipeline(`${newline}\n`, Fs.createWriteStream(`${CID_TMP_DIR}/cid-manifest.txt`, { flags: 'a' })) - } else { - skipCount++ - } - } else { - lastSavedDateReached = true - break - } - total = i + 1 - } - total = total - skipCount - const lineWriteCount = count ? count + total : total - console.log(`${total} new CID(s) saved to the manifest in this batch | ${skipCount} file(s) were skipped due to filetype mismatch | total of ${lineWriteCount} new CID(s) saved so far`) - if (uploads.length === searchParams.size && !lastSavedDateReached && searchParams.page < maxPages) { - searchParams.page += 1 - await createManifestFromWeb3StorageCids(searchParams, maxPages, lastSavedDate, lineWriteCount) - } else { - console.log(`Finished writing CID manifest file - a total of ${lineWriteCount} new CID(s) will be imported to the database.`) - } - } catch (e) { - console.log('=============== [Function: createManifestFromWeb3StorageCids]') - console.log(e) - if (e.response && e.response.status === 500) { - await createManifestFromWeb3StorageCids(searchParams, maxPages, lastSavedDate, count) - console.log(`Retrying fetching uploads on page ${searchParams.page}`) - } - } -} - -// ///////////////////////////////////////////////////////////////// CidImporter -const CidImporter = async () => { - try { - const start = process.hrtime()[0] - const limit = argv.pagesize || 1000 - const maxPages = argv.maxpages || Infinity - console.log(`📖 CID import started | page size of ${limit} and page maximum ${maxPages}.`) - // Get the latest upload entry from the database - const mostRecentCid = await MC.model.Cidtemp.find().sort({ web3storageCreatedAt: -1 }).limit(1) - const mostRecentDocument = mostRecentCid[0] - console.log('Most recent CID imported:') - console.log(mostRecentDocument) - const lastSavedDate = mostRecentDocument ? new Date(mostRecentDocument.web3storageCreatedAt).getTime() : 0 - // Delete the outdated manifest file if it exists - await deleteTemporaryFile('cid-manifest.txt') - /** - * Build a manifest list of all cids not yet uploaded to the database: - * args: - * params passed to the initial api upload list request - * limit number of pages retrieved. Set to Infinity to retrieve all CIDs since the most recently uploaded saved in the db - * the most recent upload saved to our database (so as to request only more newer uploads) - */ - await createManifestFromWeb3StorageCids({ - size: limit, - page: 1, - sortBy: 'Date', - sortOrder: 'Desc' - }, maxPages, lastSavedDate) - /** - * Retrieve and unpack files one by one from the manifest list and bulkWrite contents to the database - * args: - * limit number of entries to the database (this will only be used for test) - */ - await getCidFilesFromManifestList(limit * maxPages) - const end = process.hrtime()[0] - console.log(`📒 CID import finished | took ${SecondsToHms(end - start)}`) - process.exit(0) - } catch (e) { - console.log('===================================== [Function: CidImporter]') - console.log(e) - process.exit(0) - } -} - -// ////////////////////////////////////////////////////////////////// Initialize -// ----------------------------------------------------------------------------- -MC.app.on('mongoose-connected', CidImporter) diff --git a/packages/be/crons/worker-pool-batch-processor.js b/packages/be/scripts/worker-pool-batch-processor.js similarity index 93% rename from packages/be/crons/worker-pool-batch-processor.js rename to packages/be/scripts/worker-pool-batch-processor.js index 5bae7e4e..2655d9d3 100644 --- a/packages/be/crons/worker-pool-batch-processor.js +++ b/packages/be/scripts/worker-pool-batch-processor.js @@ -1,8 +1,6 @@ // ///////////////////////////////////////////////////// Imports + general setup // ----------------------------------------------------------------------------- -const Fs = require('fs-extra') const WorkerPool = require('workerpool') - let startTime // /////////////////////////////////////////////////////////////////// Functions @@ -101,7 +99,15 @@ const CreateWorkerPool = async (pathToScript, operation, manifest, options) => { } } +// ------------------------------------------------------------ InitializeWorker +const InitializeWorker = (operation) => { + const initOperation = {} + initOperation[operation.name] = operation + WorkerPool.worker(initOperation) +} + // --------------------------------------------------------------------- Exports module.exports = { - CreateWorkerPool + CreateWorkerPool, + InitializeWorker } diff --git a/packages/be/scripts/worker-pool-batch-processor.md b/packages/be/scripts/worker-pool-batch-processor.md new file mode 100644 index 00000000..218bd87d --- /dev/null +++ b/packages/be/scripts/worker-pool-batch-processor.md @@ -0,0 +1,43 @@ +## Multithread Batch Processing via Workerpool + +### Overview + +The worker-pool-batch-processor.js module manages a pool of node workers to process large lists of data or objects in batches. +It is a simple layer built on top of the [workerpool npm package](https://www.npmjs.com/package/workerpool) that specifically makes processing large datasets in batches quick and straightforward. +The worker-pool-batch-processor script exports two functions; 'CreateWorkerPool' and 'InitializeWorker'. + +CreateWorkerPool instantiates a pool of workers using the workerpool npm package. It then proceeds to siphon and queue batches from a +manifest list one at a time. The manifest list can either be a list of data, objects or references to files. How this list is processed or used for processing depends entirely on the script supplied to the +CreateWorkerPool function which is passed on to each individual worker. The CreateWorkerPool function is agnostic to the contents of the script and only handles delegating batches to workers and managing results returned from each worker. + +There are four arguments that the function will accept: + +- pathToScript: (String) an absolute path to a script which each worker will run +- operation: (String) the name of the root function in the script that the worker will interface with +- manifest: (Array) an array of data to process or use within the script +- options: (Object) an object with the following options: + - threads: (Number) the number of workers to run simultaneously + - batchSize: (Number) how big each batch of data should be that is processed by each worker + - onBatchResult: (Function) a function called that when a worker has finished a batch and returns the results of the batch processing. It is passed three arguments by the worker: + - 1. result: the result returned by the worker + - 2. num: the batch number + - 3. results: all results to date including this result as the most recent + These arguments would typically be used to display the workerpool's progress as each batch is completed. + - onWorkerPoolComplete: (Function) a function that is called on completion of processing the entire manifest list. i.e. all workers and queued tasks have completed. This function is passed two arguments: + - 1. results: (Array) an array of the results from each individual batch + - 2. errors: (Array) an array of any errors returned by a worker while processing a batch + +InitializeWorker is a simple initialization function that takes a single argument: + - operation: (Function) a function that each worker will perform on each batch it recieves. The name of this function must match the operation string described above. + + +### Use + +The module must be used with two separate scripts: + +One runs in the main thread and is the source of the manifest list passed to the worker pool. In this thread/script the CreateWorkerPool function must be imported from +the worker-pool-batch-processor module and called at the appropriate step with the arguments described above. + +Likewise, the InitializeWorker function must be imported in a script representing all processes to be executed in a worker thread. This function must be called as the +last step in the script and must be passed the top-most function in the script who's name exactly matches the operation string passed to the CreateWorkerPool function. +This function (the operation passed as an argument to InitializeWorker) must return a Promise which either resolves with the result of a processed batch or rejects upon error or null result. From fda9f159c332f2263c065204d3236f641aef862f Mon Sep 17 00:00:00 2001 From: svvimming Date: Fri, 2 Jun 2023 13:56:24 -0400 Subject: [PATCH 8/8] feat: documentation of abstracted worker pool module --- .../be/scripts/worker-pool-batch-processor.md | 38 ++++++++----------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/packages/be/scripts/worker-pool-batch-processor.md b/packages/be/scripts/worker-pool-batch-processor.md index 218bd87d..9eaa0b7b 100644 --- a/packages/be/scripts/worker-pool-batch-processor.md +++ b/packages/be/scripts/worker-pool-batch-processor.md @@ -2,42 +2,36 @@ ### Overview -The worker-pool-batch-processor.js module manages a pool of node workers to process large lists of data or objects in batches. -It is a simple layer built on top of the [workerpool npm package](https://www.npmjs.com/package/workerpool) that specifically makes processing large datasets in batches quick and straightforward. -The worker-pool-batch-processor script exports two functions; 'CreateWorkerPool' and 'InitializeWorker'. +The `worker-pool-batch-processor.js` module manages a pool of node workers to process large lists of data or objects in batches. It is a simple layer built on top of the [workerpool npm package](https://www.npmjs.com/package/workerpool) that specifically makes processing large datasets in batches quick and straightforward. The worker-pool-batch-processor script exports two functions; `CreateWorkerPool` and `InitializeWorker`. -CreateWorkerPool instantiates a pool of workers using the workerpool npm package. It then proceeds to siphon and queue batches from a -manifest list one at a time. The manifest list can either be a list of data, objects or references to files. How this list is processed or used for processing depends entirely on the script supplied to the -CreateWorkerPool function which is passed on to each individual worker. The CreateWorkerPool function is agnostic to the contents of the script and only handles delegating batches to workers and managing results returned from each worker. +`CreateWorkerPool` instantiates a pool of workers using the workerpool npm package. It then proceeds to siphon and queue batches from a manifest list one at a time. The manifest list can either be a list of data, objects or references to files. How this list is processed or used for processing depends entirely on the script supplied to `CreateWorkerPool`, the contents of which are passed on to each individual worker. The `CreateWorkerPool` function is agnostic to the contents of the script and only handles delegating batches to workers and managing results returned from each worker. There are four arguments that the function will accept: -- pathToScript: (String) an absolute path to a script which each worker will run -- operation: (String) the name of the root function in the script that the worker will interface with -- manifest: (Array) an array of data to process or use within the script -- options: (Object) an object with the following options: - - threads: (Number) the number of workers to run simultaneously - - batchSize: (Number) how big each batch of data should be that is processed by each worker - - onBatchResult: (Function) a function called that when a worker has finished a batch and returns the results of the batch processing. It is passed three arguments by the worker: +- **pathToScript**: (String) an absolute path to a script which each worker will run +- **operation**: (String) the name of the root function in the script that the worker will interface with +- **manifest**: (Array) an array of data to process or use within the script +- **options**: (Object) an object with the following options: + - _threads_: (Number) the number of workers to run simultaneously + - _batchSize_: (Number) how big each batch of data should be that is processed by each worker + - _onBatchResult_: (Function) a function called that when a worker has finished a batch and returns the results of the batch processing. It is passed three arguments by the worker: - 1. result: the result returned by the worker - 2. num: the batch number - 3. results: all results to date including this result as the most recent - These arguments would typically be used to display the workerpool's progress as each batch is completed. - - onWorkerPoolComplete: (Function) a function that is called on completion of processing the entire manifest list. i.e. all workers and queued tasks have completed. This function is passed two arguments: + These arguments would typically be used to display the worker pool's progress as each batch is completed. + - _onWorkerPoolComplete_: (Function) a function that is called on completion of processing the entire manifest list. i.e. all workers and queued tasks have completed. This function is passed two arguments: - 1. results: (Array) an array of the results from each individual batch - 2. errors: (Array) an array of any errors returned by a worker while processing a batch -InitializeWorker is a simple initialization function that takes a single argument: - - operation: (Function) a function that each worker will perform on each batch it recieves. The name of this function must match the operation string described above. +`InitializeWorker` is a simple initialization function that takes a single argument: +- **operation**: (Function) a function that each worker will perform on each batch it receives. The name of this function must match the operation string described above. ### Use The module must be used with two separate scripts: -One runs in the main thread and is the source of the manifest list passed to the worker pool. In this thread/script the CreateWorkerPool function must be imported from -the worker-pool-batch-processor module and called at the appropriate step with the arguments described above. +One runs in the main thread and is the source of the manifest list passed to the worker pool. In this thread/script the `CreateWorkerPool` function must be imported from the `worker-pool-batch-processor.js` module and called at the appropriate step with the arguments described above. -Likewise, the InitializeWorker function must be imported in a script representing all processes to be executed in a worker thread. This function must be called as the -last step in the script and must be passed the top-most function in the script who's name exactly matches the operation string passed to the CreateWorkerPool function. -This function (the operation passed as an argument to InitializeWorker) must return a Promise which either resolves with the result of a processed batch or rejects upon error or null result. +Likewise, the `InitializeWorker` function must be imported in a script representing all processes to be executed in a worker thread. This function must be called as the last step in the script and must be passed the top-most function in the script whose name exactly matches the operation string passed to the `CreateWorkerPool` function. +This function (the operation passed as an argument to `InitializeWorker`) must return a Promise which either resolves with the result of a processed batch or rejects upon error or null result.