I need to send data to a TCP port (using node.js) wait for a reply and then return the reply to the sender. This needs to be able to handle one request at a time or many requests at a time.
The other end of the TCP port is getting data from a serial port and the data is not self descriptive so the node.js logic must send one request at a time and associate the reply with that request.
I am new to node.js and to javascript and the asynchronous nature was baffling to me at first.
I have finally created a module that accomplished this task.
I would like to share this with others who are have similar needs.
But the module may need refinement that I am not aware of.
Questions:
******************** PSEUDO CODE ********************
When object is created
open port, initialize objects and create variables
event : on open port
if open is successful then transition to open
if there are items in the queue then transmit first item in queue
function : enqueue (receive request and put it in the queue)
add request to end of queue
if port is closed then open port
if port state is idle then transmit first item in queue
if port state is busy then do nothing, the data event will take care of it when it is triggers
function : transmit
transition to busy
write first item in queue to port
event: on data
send data reply to callback
transition to idle
remove request from queue
if queue still has items then transmit
event : on open
transition to open
if queue has items then transmit
event: on close
transition to closed
if queue has items then open port
serialSocket.js
/*jslint node: true, vars: true, plusplus: true, devel: true, nomen: true, indent: 4, maxerr: 50, globalstrict: true*/ /*global require*/
"use strict";
var net = require('net');
function SerialSocket (setupParams, callback) {
var PORT_STATE = Object.freeze({closed: 1, busy: 2, idle: 3});
var self = this;
self.intiializing = true;
self.queue = [];
self.openOnIdle = setupParams.openOnIdle ? true : false;
self.port = setupParams.port;
self.host = setupParams.host;
self.verbose = setupParams.verbose;
if (self.verbose) {console.log('Creating Connecting to: ', self.host + ':' + self.port);}
self.socket = net.createConnection(setupParams.port, setupParams.host);
self.writeCount = 0;
self.readCount = 0;
self.concurrentCalls = 0;
self.portState = PORT_STATE.closed;
// reply as object is defaulted to true if not defined
if (typeof setupParams.replyAsObject === 'undefined'){
self.replyAsObject = true;
} else {
self.replyAsObject = setupParams.replyAsObject;
}
self.openPort = function () {
if (self.verbose) {console.log('Connecting to: ', self.host + ':' + self.port);}
self.socket.connect(self.port, self.host, function () {
if (self.verbose) {console.log('Connected');}
});
};
self.closePort = function () {
if (self.verbose) {console.log('Closing: ', self.host + ':' + self.port);}
self.socket.end(function () {
self.portState = PORT_STATE.closed;
if (self.verbose) {console.log('Closed');}
});
};
self.enqueue = function (data, callback) {
// add request to queue
// if closed then open
// if idle then write one
// if busy then do nothing, the data event will take care of
if (self.verbose) {console.log('adding to queue: ' , data);}
self.queue.push({data : data, callback: callback});
if (self.portState === PORT_STATE.closed) {
self.openPort();
} else if (self.portState === PORT_STATE.idle) {
self.transmit();
} // else if portState === busy then do nothing
}; // end of enqueue
self.transmit = function () {
var data = '';
self.portState = PORT_STATE.busy;
if (self.verbose) {console.log(self.queue[0].data);}
// need to transmit in text format
// if data is undefined then don't send it
// if data is in text format or null then send it without altering it
// if data is anything else (object, array, number, boolean) then use JSON.stringify
var type = typeof self.queue[0].data;
if (type === "undefined") {
console.log("sericalSocket.transmit can not handle undefined data.");
return;
}
if (type === "function") {
console.log("sericalSocket.transmit can not handle functions");
return;
}
if (type === "text" || type === "null") {
data = self.queue[0].data;
} else {
data = JSON.stringify(self.queue[0].data);
}
self.concurrentCalls++;
self.socket.write(data,
function () {
self.writeCount++;
});
};
self.socket.on('data', function (data) {
if (self.verbose) {
console.log('Received message # ' + self.readCount + ', length = ' + data.length + ' cc = ' + self.concurrentCalls);
console.log('data = ' + data);
}
self.readCount++;
self.concurrentCalls--;
if (self.replyAsObject) {
// convert the reply to JSON
try {
data = JSON.parse(data); // convert the reply from JSON to an object
} catch (e) {
try {
data = {data : data, text : data.toString()}; // convert the reply from raw data to an object if it is not JSON
} catch (e2) {
data = {data : data, text : ""}; // if toString causes an exception then make the string empty
}
}
}
self.portState = PORT_STATE.idle;
self.queue[0].callback(data);
self.queue.shift();
if (self.queue.length) {
self.transmit();
} else {
if (!self.openOnIdle) {
self.closePort();
}
}
});
self.socket.on('open', function () {
//transition to open port state
//if queue has items then write one
self.portState = PORT_STATE.idle;
if (self.intiializing) {
self.intiializing = false;
callback(true);
} else {
if (self.queue.length) {
self.transmit();
}
}
});
self.socket.on('close', function () {
//transition to closed port state
self.portState = PORT_STATE.closed;
if (self.queue.length) {
self.openPort();
}
});
self.socket.on('connect', function () {
self.portState = PORT_STATE.idle;
if (self.verbose) { console.log("#connect."); }
if (self.intiializing) {
self.intiializing = false;
callback(true);
} else {
if (self.queue.length) {
self.transmit();
}
}
});
self.socket.on('data', function () {
if (self.verbose) { console.log("#data."); }
});
self.socket.on('drain', function () {
if (self.verbose) { console.log("#drain."); }
});
self.socket.on('end', function () {
if (self.verbose) { console.log('#end.'); }
});
self.socket.on('error', function (error) {
if (self.verbose) { console.log('#error:' + error); }
if (self.intiializing) {
self.intiializing = false;
callback(false);
}
});
self.socket.on('timeout', function () {
if (self.verbose) { console.log('#timeout.'); }
});
self.socket.on('lookup', function () {
if (self.verbose) { console.log('#lookup.'); }
});
self.socket.on('close', function () {
if (self.verbose) { console.log('#connection closed'); }
});
} // end of class serialSocket
module.exports = SerialSocket;
app.js
/*jslint node : true, vars: true, plusplus: true, devel: true, nomen: true, indent: 4, maxerr: 50, globalstrict: true*/ /*global require*/
"use strict";
var SerialSock = require('./serialSocket');
var serialSock = new SerialSock({ port : 9992, host : '127.0.0.1', verbose : false}, function(success){
if (success) {
var data = '{"some data}';
serialSock.enqueue(data, function (response) {
console.log(response)
});
data = '{"some new data}';
serialSock.enqueue(data, function (response) {
console.log(response)
});
// make as many requests as you want they will be queued in an array
} else {
console.log ('failed to connect!!!');
}
});