Express + Socket.IO + RabbitMQ (node-amqp)

I'm having a hard time putting all these three together, probably because I'm not getting properly the concept of routing with Express.

I have a RabbitMQ queue with event updates. We can recognize these events by their id. So I want to get on a given page about an event, just the updates corresponding to its id.

Queue: 1316, 1539, 3486, 3479, 1316, 3890, 3479, ... -> Feed from the DB indefinitely. www.example.com/event/1316 -> Gets from the queue just messages with id 1316 www.example.com/event/3479 -> Gets from the queue just messages with id 3479

My code works good when I load the first event, but when I load the second one in a different window, it gets messages from both events, and if I load a third one, guess right, it gets messages from the three ids.

app.js

var express = require('express')  
, http = require('http');
var app = express();
var server = http.createServer(app);
var io = require('socket.io').listen(server, { log: false });
require('./io')(io);

var amqp = require('amqp');
var rabbitMQ = amqp.createConnection({ host: 'localhost' });

rabbitMQ.on('ready', function() {
  console.log('Connected to RabbitMQ');
  io.sockets.on('connection', function (socket) {
    console.log('Socket connected: ' + socket.id);
    rabbitMQ.queue('offer', { autoDelete: false, durable: false, exclusive: false }, function(q) {    
      q.bind('#'); // Catch all messages    
      q.subscribe(function (message) {
        obj = JSON.parse(message.data.toString());
        //socket.broadcast.to(obj.id).emit('message', obj);
        io.sockets.in(obj.id).emit('message', obj);
      });
    });
  });
});

var routes = require('./routes')
, event = require('./routes/event');

app.get('/', routes.index);
app.get('/event/:id', event.index);

server.listen(app.get('port'), function(){
  console.log("Express server listening on port " + app.get('port'));
});

io.js

var socketio = function (io) { 
  if (!io) return socketio._io;  
  socketio._io = io;
} 

module.exports = socketio;

routes/event.js

var io = require('../io')();

exports.index = function(req, res) {
  io.sockets.on('connection', function (socket) {
    socket.join(req.params.id);
  });
  res.render('event', { title: 'Event' });
};

Thanks!

You are receiving them all because you join but never leave the room. If you look at Socket IO Rooms from the wiki, at the bottom, it provides io.sockets.manager.roomClients[socket.id] as a way to get a list of rooms that the socket has joined (which I suspect will include all three if you've visited all three links).

You might want to try going through this list of rooms and leave any that aren't the current room, and see if that solves the problem.

Edit

Ok, so, there are two reasons/solutions to this. I just tested my theory, and it is valid - you receive messages for each room you've joined, and will continue to do so until you leave them. So here are the options:

1. leave all other rooms when they join a room

io.sockets.on('connection', function (socket) {
    var room = req.params.id;

    var roomKeys = Object.keys(io.sockets.manager.roomClients[socket.id]);
    roomKeys.forEach(function(key) {
        if (key === '' || key === '/' + room) return;
        socket.leave(key.slice(1));
    });

    socket.join(room);
});

As said, I tested this. It works.

2. Don't send a message event, send a {room name} event

Instead of emitting a 'message' event, you could emit a '{room name}' event. Instead of your q.subscribe() callback containing io.sockets.in(obj.id).emit('message', obj);, you would just do socket.emit(obj.id, obj); and you'd have the javascript client only listen for that page's event types (based on the URL path).

I also tested this. It also works. It's also simpler (I think) because it only requires the .emit() in your q.subscribe() callback, which means you save the 'room management' stuff.

After trying and failing, I have understood what I was doing wrong cause using io.sockets.on('connection') inside of the router was duplicating the event. So at the end, the simplest way of thinking is the right one.

app.js

var room = '';
var roomHandler = function(req, res, next) {
  if (req.path.match('event')) {
    room = req.params.id;
  } 
  next(); // Passing the request to the next handler in the stack.
}

io.sockets.on('connection', function (socket) {    
  socket.join(room);
});

rabbitMQ.on('ready', function() { 
  rabbitMQ.queue('offer', { autoDelete: false, durable: false, exclusive: false }, function(q) {      
    q.bind('#'); // Catch all messages  
    q.subscribe(function (message) {
      obj = JSON.parse(message.data.toString());
      io.sockets.in(obj.id).emit('message', obj);
    });
  });
});

app.get('/', routes.index);
app.get('/event/:id', roomHandler, event.index);