Reading CSV file and sending data in intervals with websockets (Node, Socket.io)

I'm relatively new to Node and Express.js. I'm trying to create a websocket server to push CSV data in irregular intervals stored in the file itself, line after line. The CSV structure is something like this: [timeout [ms], data1, data2, data3 ...]

I've successfully created a websocket server which communicates with the client.

I'm looking for a best solution to effectively do something like this: 1. Read a line of the CSV file 2. Send a line with WebSockets 3. Pause the reading for a period of time stored in the first value of the row 4. Resume the reading after the interval has passed, and back to step 1.

So far, I got this far (please feel free to trash my code completely as it might be very wrong - as I said, I'm new to it. It seems like the pause() doesn't do anything.

var $    = require('jquery')
,csv = require('csv');

exports.index = function(server){
  var io   = require('socket.io').listen(server);

  io.sockets.on('connection', function (socket) {

  socket.on('startTransmission', function(msg) {
    csv()
    .from.path('C:/dev/node_express/csv/test.csv', { delimiter: ',', escape: '"' })
    .on('record', function(row,index){
      var rowArray = $.parseJSON(JSON.stringify(row));
      var json = {},
          that = this;
        $.each(rowArray, function(i,value){
          json[keys[i]] = value;
        });
        socket.emit('transmitDataData', json);
        //this.pause(); //I guess around here is where I'd like to pause 
        // setTimeout(function(){
        //   that.resume();  //and resume here after the timeout, stored in the first value (rowArray[0])    
        // }, rowArray[0]);

    });
});
});
};

The commented out code unfortunately does not work - All data is sent immediately, row after row, the function doesn't pause

I ran into the same sort of thing with another use case. The issue is that calling pause() on the stream pauses the underlying stream reading but not the csv record parsing, so the record event can get called with the remainder of the records that made up the last read stream chunk. I synchronized them, in my case, like this:

var rows=0, actions=0;

stream.on('record', function(row, index){                                                                 

    rows++;                                

    // pause here, but expect more record events until the raw read stream is exhausted
    stream.pause();

    runner.do(row, function(err, result) {                                                 

        // when actions have caught up to rows read, read more rows.
        if (actions==rows) {
            stream.resume();
        }                    
    });
});

In your case, I'd buffer the rows and release them with the timer. Here's an untested re-factoring just to give you an idea of what I mean:

var $ = require('jquery'),
    csv = require('csv');

exports.index = function(server){

  var io = require('socket.io').listen(server);
  io.sockets.on('connection', function (socket) {

      socket.on('startTransmission', function(msg) {

        var timer=null, buffered=[], stream=csv().from.path('C:/dev/node_express/csv/test.csv', { delimiter: ',', escape: '"' });

        function transmit(row) {        
            socket.emit('transmitDataData', row);                                     
        }       

        function drain(timeout) {                                                    
            if (!timer) {
                timer = setTimeout(function() {                                    
                    timer = null;
                    if (buffered.length<=1) { // get more rows ahead of time so we don't run out. otherwise, we could skip a beat.
                        stream.resume(); // get more rows
                    } else {                        
                        var row = buffered.shift();
                        transmit(row);
                        drain(row[0]);                        
                    }

                }, timeout);               
            }                
        }

        stream.on('record', function(row,index){                        
            stream.pause();                                                                                   
            if (index == 0) {                            
                transmit(row);                                               
            } else {                            
                buffered.push(row);                                   
            }                                                       
            drain(row[0]); // assuming row[0] contains a timeout value.                                                                  
        });

        stream.on('end', function() {
            // no more rows. wait for buffer to empty, then cleanup.
        });

        stream.on('error', function() {
            // handle error.
        });

    });
};