Say I have a directory foo, with some number of subdirectories. Each of these subdirectories has between 0 and 5 files of variable length which I would like to process. My initial code looks like so:
pool.query(`
SET SEARCH_PATH TO public,os_local;
`).then(() => fs.readdirSync(srcpath)
.filter(file => fs.lstatSync(path.join(srcpath, file)).isDirectory())
.map(dir => {
fs.access(`${srcpath + dir}/${dir}_Building.shp`, fs.constants.R_OK, (err) => {
if (!err) {
openShapeFile(`${srcpath + dir}/${dir}_Building.shp`).then((source) => source.read()
.then(function dbWrite (result) {
if (result.done) {
console.log(`done ${dir}`)
} else {
const query = `INSERT INTO os_local.buildings(geometry,
id,
featcode,
version)
VALUES(os_local.ST_GeomFromGeoJSON($1),
$2,
$3,
$4) ON CONFLICT (id) DO UPDATE SET
featcode=$3,
geometry=os_local.ST_GeomFromGeoJSON($1),
version=$4;`return pool.connect().then(client => {
client.query(query, [geoJson.split('"[[').join('[[').split(']]"').join(']]'),
result.value.properties.ID,
result.value.properties.FEATCODE,
version
]).then((result) => {
return source.read().then(dbWrite)
}).catch((err) => {
console.log(err,
query,
geoJson.split('"[[').join('[[').split(']]"').join(']]'),
result.value.properties.ID,
result.value.properties.FEATCODE,
version
)
return source.read().then(dbWrite)
})
client.release()
})
}
})).catch(err => console.log('No Buildings', err))
}
})
fs.access(`${srcpath + dir}/${dir}__ImportantBuilding.shp`, fs.constants.R_OK, (err) => {
//read file one line at a time//spin up connection in pg.pool, insert data
})
fs.access(`${srcpath + dir}/${dir}_Road.shp`, fs.constants.R_OK, (err) => {
//read file one line at a time//spin up connection in pg.pool, insert data
})
fs.access(`${srcpath + dir}/${dir}_Glasshouse.shp`, fs.constants.R_OK, (err) => {
//read file one line at a time//spin up connection in pg.pool, insert data
})
fs.access(`${srcpath + dir}/${dir}_RailwayStation.shp`, fs.constants.R_OK, (err) => {
//read file one line at a time//spin up connection in pg.pool, insert data
})
})
This mostly works, but it ends up having to wait for the longest file to be fully processed in every subdirectory, resulting in practice in there always being only 1 connection to the database.
Is there a way I could rearchitect this to make better use of my computational resources, while limiting the number of active postgres connections and forcing code to wait until connections become available? (I set them to 20 in the pg poolConfig for node-postgres)
One way to improve the performance of your code would be to use Promise.all to process all the files in each subdirectory concurrently. You can do this by replacing the map function with the following:
Promise.all(fs.readdirSync(srcpath) .filter(file => fs.lstatSync(path.join(srcpath, file)).isDirectory()) .map(async dir => { const promises = [] promises.push(new Promise((resolve, reject) => { fs.access(`${srcpath + dir}/${dir}_Building.shp`, fs.constants.R_OK, (err) => { if (!err) { openShapeFile(`${srcpath + dir}/${dir}_Building.shp`).then((source) => source.read() .then(function dbWrite (result) { if (result.done) { console.log(`done ${dir}`) resolve() } else { const query = `INSERT INTO os_local.buildings(geometry, id, featcode, version) VALUES(os_local.ST_GeomFromGeoJSON($1), $2, $3, $4) ON CONFLICT (id) DO UPDATE SET featcode=$3, geometry=os_local.ST_GeomFromGeoJSON($1), version=$4;`return pool.connect().then(client => { client.query(query, [geoJson.split('"[[').join('[[').split(']]"').join(']]'), result.value.properties.ID, result.value.properties.FEATCODE, version ]).then((result) => { return source.read().then(dbWrite) }).catch((err) => { console.log(err, query, geoJson.split('"[[').join('[[').split(']]"').join(']]'), result.value.properties.ID, result.value.properties.FEATCODE, version ) return source.read().then(dbWrite) }) client.release() }) } })).catch(err => console.log('No Buildings', err)) } else { resolve() } }) })) promises.push(new Promise((resolve, reject) => { fs.access(`${srcpath + dir}/${dir}__ImportantBuilding.shp`, fs.constants.R_OK, (err) => { //read file one line at a time//spin up connection in pg.pool, insert dataif (err) { resolve() } }) })) promises.push(new Promise((resolve, reject) => { fs.access(`${srcpath + dir}/${dir}_Road.shp`, fs.constants.R_OK, (err) => { //read file one line at a time//spin up connection in pg.pool, insert dataif (err