node.js + socket.io + multiple database calls needed for result

I need help.

I've been trying to wrap my head around async programming with node.js and socket.io for a day now. I understand that I need some flow control but I don't seem to understand how to properly implement it.

I have a redis datastore that has modules stored in a set let's say 'moda','modb' instances of those modules are 'moda:instances' and in 'modb:instances' the properties of those instances are stored in 'moda:instancea' and 'modb:instanceb' as a hash.

I am trying to get the following json:

"moda": {"instancea": {"property1": "value1", "property2", "value2"}}, "modb": {"instanceb": {"property1": "value1"}}

Could someone give me a little push in the right direction?

Here is my current code:

var io = require('socket.io').listen(2000);
var redis = require('redis').createClient();
var http = require('http');
var async = require('async');
var step = require('step');

io.sockets.on('connection', function (socket) {

var notifications = require('redis').createClient();
notifications.subscribe("notification");
notifications.on("message", function (channel, message) {
  socket.send(message);
  console.log(channel + ':' + message);
});

socket.on('modules', function(params, callback) {
  var response = {};
  async.series([

  function (callback) {
    console.log('1>');
    redis.smembers('modules', function (err, modules) {
      async.forEachSeries(modules, function(module, moduleCallback) {
        response[module] = {}

        redis.smembers(module + ':instances', function(err, instances) {
          async.forEachSeries(instances, function(instance, instanceCallback) {
            response[module][instance] = {}
            console.log('2>' + module + ':' +instance);
            instanceCallback();
          });
          moduleCallback();
        });
      });
      callback();
    });
  },
  function (callback) {
    console.log('3');
    callback();
  }

], function() {
  console.log(JSON.stringify(response));
});


});

});

The output from this code is:

   info  - socket.io started
   debug - client authorized
   info  - handshake authorized JMMn1I8aiOMGCMPOhC11
   debug - setting request GET /socket.io/1/websocket/JMMn1I8aiOMGCMPOhC11
   debug - set heartbeat interval for client JMMn1I8aiOMGCMPOhC11
   debug - client authorized for 
   debug - websocket writing 1::
1>
3
{"moda":{}}
2>moda:instancea
2>moda:instanceb
2>modb:instancea

The problem comes from the fact forEachSeries requires an additional callback as a third parameter (to be called when the whole processing is complete). You cannot just put some code after forEachSeries hoping it will be called once it is complete.

Here is your code modified:

var response = {};
async.series([

function (callback) {
  console.log('1>');
  redis.smembers('modules', function (err, modules) {
    async.forEachSeries(modules, function(module, moduleCallback) {
      response[module] = {}

      redis.smembers(module + ':instances', function(err, instances) {
        async.forEachSeries(instances, function(instance, instanceCallback) {
          response[module][instance] = {}
          console.log('2>' + module + ':' +instance);
          instanceCallback();
        }, moduleCallback );
      });
    }, callback );
  });
},
function (callback) {
  console.log('3');
  callback();
}],
function() {
   console.log(JSON.stringify(response));
});

Note how the callback, and moduleCallback are used as a third parameter. The output is:

1>
2>moda:instancea
2>moda:instanceb
2>modb:instancea
3
{"moda":{"instancea":{},"instanceb":{}},"modb":{"instancea":{}}}

which I guess is what you expected.

Additional remark: forEachSeries will process everything in sequence, the next operation waiting for the previous one to complete. This will generate plenty of roundtrips to Redis. forEach should be much more efficient here to leverage pipelining.

Take a look also at promise concept, with promises you configure the flow much more readable way.

First you need to prepare promise versions of redis methods that you're using:

var promisify = require('deferred').promisify;
var RedisClient = require('redis').RedisClient;
RedisClient.prototype.psmembers = promisify(RedisClient.prototype.smembers);

Then you can construct your flow as:

console.log('1>');
redis.psmembers('modules').map(function (module) {
  response[module] = {};
  return redis.psmembers(module + ':instances').map(function (instance) {
    response[module][instance] = {};
    console.log('2>' + module + ':' +instance);
  });
}).end(function () {
  console.log('3');
  console.log(JSON.stringify(response));
});

Check: https://github.com/medikoo/deferred