nodejs: read from file and store to db, limit maximum concurrent db operations

I have a CSV file that I am reading in as a stream, and using transforms to convert to JSON and then asynchronously store each line to a DB.

The issue is that reading from the file is fast, and so leads to very large numbers of concurrent async DB operations, which causes the app to grind to a halt.

I'd like to limit the app such that a max of N outstanding DB operations are in progress at any given time.

This is the basic core of my _transform function:

parser._transform = function(data, encoding, done) {
    //push data rows
    var tick = this._parseRow(data);

    //Store tick
    db.set(tick.date, tick, function(err, result) {
      console.log(result);
      if(err) throw err;
    });

    this.push(tick);
    done();
};

I've looked at a few options, but these seemed the best candidates:

  • Use the async api 'forEachLimit'
    • The problem I see here is that in my stream transform, I am only operating on one object (line from file) when issuing operations.
    • Reading whole file in isn't feasible due to size
  • Use an asynchronous, parallel, concurrency limited solution as described here, in section 7.2.3:
    • http://book.mixu.net/node/ch7.html
    • The problem for me here is what to do in the case 'limit' is reached.
    • Spinning or using setTimeout seems to use up all scheduled time and prevents my DB callbacks which should decrement the 'running' counter being initiated.

This was my initial attempts at the 'concurrency limited solution':

var limit = 100;
var running = 0;

parser._transform = function(data, encoding, done) {
  //push data rows
  var tick = this._parseRow(data);

  this.push(tick);
  //Store tick to db
  if (running < limit) {
    console.log("limit not reached, scheduling set");
    running++;
    cb.set(tick.date, tick, function(err, result) {
      running--;
      console.log("running is:" + running);
      console.log(result);
      if(err) throw err;
    });
  } else {
    console.log("max limit reached, sleeping");
    setTimeout(this._transform(data, encoding, done),1000);
  }
  done();
};

I've only started node.js this week, so I'm not clear what the correct model for solving this is.

Note: Couple of things I am aware of that is that this should at least be an exponential backoff if using the latter model, and there should be some 'max backoffs' system in place, so as not to blow out the call stack. Tried to keep it simple here for now though.

The concurrency limited solution option is the approach I would take but, instead of implementing that myself, I'd just use the async module. Specifically, the queue method.

Something like:

var dbQueue = async.queue(function(tick, callback) {
    db.set(tick.date, tick, function(err, result) {
        console.log(result);
        callback(err, result);
    });
}, 3); // the last arg (3) is the concurrency level; tweak as needed

parser._transform = function(data, encoding, done) {
    //push data rows
    var tick = this._parseRow(data);

    dbQueue.push(tick);

    this.push(tick);
    done();
};

That will limit your db operations to 3 at a time. Additionally, you can use the queue's saturated and empty events to pause/resume your stream to keep things even more limited in terms of resource use (which will be nice if you're reading really large files). That would look something like:

dbQueue.saturated = function() {
    parser.pause();
}

dbQueue.empty = function() {
    parser.resume();
}

Database is limited to one simultaneous disk write at any given time. With this in mind any parallel write slows down the whole operation. If file is small enough, try to read it whole into the memory, then write it to database in one operation. Otherwise split it into as big chunks as you can then query them one after another.