Socket.io 1.0.6 , nodejs CPU over 100%

I am creating a web server that manages subscriptions to live data. This data is being streamed to nodejs over TCP and then I stream to client that has subscribed to particular topic over websockets using Socket.io.

Here is the workflow.

Client sends a subscribe request on a particular topic. Nodejs application will proxy this subscription to java application that will then start pushing data to nodejs over TCP. Java application initiates 9 different socket connections for each possible category of news that can be listened on, one category per socket

An important thing to mention is that this data can be huge because when client subscribes they first get historical data(up to 45MB) and every subsequent push contains a small increment.

The problem that I am having is that on initial push nodejs application uses over 100% of CPU. Initially I thought that the issue was with parsing data that was being sent to me. After days of debugging I found out that when commenting out socket.emit(data.topic, data) CPU drops to 1% - 3%. That makes me believe that the issue that I am having is with socket.io.

Am I misusing socket.io here? Are there any alternatives that can push on different topics/channels? Is there something wrong in my code?

Thank you for your time!

//needed for REST subscribe services
var app = require('express')()
    //needed for HTTP server
    , server = require('http').createServer(app) 
    //WebSocket library (TODO see how to implement this)
    //, sockjs = require('sockjs') 
    //backend subscriptions
    , subUtil = require('./subscriptions/subscriptionManager')
    , bodyParser = require('body-parser')
    //Using Socket.io for now
    , io = require('socket.io').listen(server)
    , config = require('./settings/config')
    , tcpServer = require('./servers/tcpServer')
    //logger
    , logger = require('./logging/logger');

//Parser used for parsing JSON out of HTTP body
app.use(bodyParser.json()); 
// error handlers
app.use(function(err, req, res, next){
  logger.log.error(err.stack);
  res.send(500, 'Something broke!');
});

app.post('/subscribe/worldnews', subUtil.subWorldNews);
app.post('/subscribe/sportnews', subUtil.subSoirtNews);
app.post('/subscribe/sciencenews', subUtil.subScienceNews);
app.post('/subscribe/townnews', subUtil.subTownNews);
app.post('/subscribe/countrynews', subUtil.subCountryNews); 
app.post('/subscribe/regionnews', subUtil.subRegionNews);
app.post('/subscribe/weathernews', subUtil.subWeatherNews);
app.post('/subscribe/hotnews', subUtil.subHotNews);
app.post('/subscribe/relatednews', subUtil.subRelatedNews);
app.post('/unsubscribe', subUtil.unsubscribe);

//Start tcp server
tcpServer.startServer();
//Set max number of listeners, 100 users can connect
tcpServer.setMaxListeners(100);
//Create Socket.IO connection
io.sockets.on('connection', function (socket) {

    //handling incoming backend data
    var incomingHandler = function(data) {
        //send data to the user
        socket.emit(data.topic, data);
    };
    //Data is comming from backend 
    tcpServer.on('incoming', incomingHandler);
    //CLient has disconnected
    socket.on('disconnect', function (discData) {
        logger.log.info('User has disconnected');
        //remove listener
        tcpServer.removeListener('incoming', incomingHandler);
        //unsubscribe all topics with this token
        subUtil.unsubscribeAll(token); 
    });
});
//HTTP server listening
server.listen(config.httpPort);
logger.log.info('TCP connection on port ' + config.tcpPort + '; HTTP connection on the port ' + config.httpPort);

/*************TCP server module is bellow*************************/

//import utils because it is easier to do inheritance
var util = require('util')
    //import core events
    , EventEmitter = require('events').EventEmitter
    , config = require('../settings/config')
    //needed for TCP connection
    , net = require('net')
    , JsonSocket = require('json-socket')
    , logger = require('../logging/logger');

var TcpServer = function() {
        EventEmitter.call(this);
}

//inherit from events
util.inherits(TcpServer, EventEmitter);

TcpServer.prototype.startServer = function(incomingData) {
    //make a reference to postRequest function(important)
    var self = this;

    var tcpServer = net.createServer(function(sock) {
        logger.log.info('backend connected');
        //Handle socket error
        sock.on('error', function(err) {
                logger.log.error('There was a socket error :( \r\n' + err.stack);
        });

        sock.on('data', function(data) {
            logger.log.info('Incoming data');
            _onData(data, self)
        })
        var _contentLength = null;
        var _buffer = '';

        var _onData = function(data, ref) {
            data = data.toString();
            try {
                _handleData(data, ref);
            } catch (e) {
                logger.log.error(e);
            }
        }
        var _handleData = function(data, ref) {
            _buffer += data;
            if (_contentLength == null) {
                var i = _buffer.indexOf('#');
                //Check if the buffer has a #, if not, the end of the buffer string might be in the middle of a content length string
                if (i !== -1) {
                    var rawContentLength = _buffer.substring(0, i);
                    _contentLength = parseInt(rawContentLength);
                    if (isNaN(_contentLength)) {
                        _contentLength = null;
                        _buffer = '';
                        var err = new Error('Invalid content length supplied ('+rawContentLength+') in: '+ _buffer);
                        err.code = 'E_INVALID_CONTENT_LENGTH';
                        throw err;
                    }
                    _buffer = _buffer.substring(i+1);
                }
            }
            if (_contentLength != null) {
                if (_buffer.length == _contentLength) {
                    _handleMessage(_buffer, ref);
                } else if (_buffer.length > _contentLength) {
                    var message = _buffer.substring(0, _contentLength);
                    var rest = _buffer.substring(_contentLength);
                    _handleMessage(message, ref);
                    _onData(rest, ref);
                }
            }
        }

        var _handleMessage = function(data, ref) {
            _contentLength = null;
            _buffer = '';
            var message;
            try {
                message = JSON.parse(data);
            } catch (e) {
                var err = new Error('Could not parse JSON: '+e.message+'\nRequest data: '+data);
                err.code = 'E_INVALID_JSON';
                throw err;
            }
            message = message || {};
            ref.emit('incoming', message);
        }
    });

    //Handle server error
    tcpServer.on('error', function(err) {
        logger.log.error('There was a connection err \r\n' + err.stack);
    });
    //listen fot backend data on port
    tcpServer.listen(config.tcpPort);
}

module.exports = new TcpServer();