I have a CSV file that I am reading in as a stream, and using transforms to convert to JSON and then asynchronously store each line to a DB.
The issue is that reading from the file is fast, and so leads to very large numbers of concurrent async DB operations, which causes the app to grind to a halt.
I'd like to limit the app such that a max of N outstanding DB operations are in progress at any given time.
This is the basic core of my _transform function:
parser._transform = function(data, encoding, done) {
//push data rows
var tick = this._parseRow(data);
//Store tick
db.set(tick.date, tick, function(err, result) {
console.log(result);
if(err) throw err;
});
this.push(tick);
done();
};
I've looked at a few options, but these seemed the best candidates:
This was my initial attempts at the 'concurrency limited solution':
var limit = 100;
var running = 0;
parser._transform = function(data, encoding, done) {
//push data rows
var tick = this._parseRow(data);
this.push(tick);
//Store tick to db
if (running < limit) {
console.log("limit not reached, scheduling set");
running++;
cb.set(tick.date, tick, function(err, result) {
running--;
console.log("running is:" + running);
console.log(result);
if(err) throw err;
});
} else {
console.log("max limit reached, sleeping");
setTimeout(this._transform(data, encoding, done),1000);
}
done();
};
I've only started node.js this week, so I'm not clear what the correct model for solving this is.
Note: Couple of things I am aware of that is that this should at least be an exponential backoff if using the latter model, and there should be some 'max backoffs' system in place, so as not to blow out the call stack. Tried to keep it simple here for now though.
The concurrency limited solution option is the approach I would take but, instead of implementing that myself, I'd just use the async module. Specifically, the queue method.
Something like:
var dbQueue = async.queue(function(tick, callback) {
db.set(tick.date, tick, function(err, result) {
console.log(result);
callback(err, result);
});
}, 3); // the last arg (3) is the concurrency level; tweak as needed
parser._transform = function(data, encoding, done) {
//push data rows
var tick = this._parseRow(data);
dbQueue.push(tick);
this.push(tick);
done();
};
That will limit your db operations to 3 at a time. Additionally, you can use the queue's saturated and empty events to pause/resume your stream to keep things even more limited in terms of resource use (which will be nice if you're reading really large files). That would look something like:
dbQueue.saturated = function() {
parser.pause();
}
dbQueue.empty = function() {
parser.resume();
}
Database is limited to one simultaneous disk write at any given time. With this in mind any parallel write slows down the whole operation. If file is small enough, try to read it whole into the memory, then write it to database in one operation. Otherwise split it into as big chunks as you can then query them one after another.