I'm currently using a child process in order to run some computational functions in a non-blocking way.
I'm running into an issue, where by I need to be able to tie the result of a computation back to the correct callback in the parent process. For example:
var cp = require('child_process');
var n = cp.fork(__dirname + '/child.js');
exports.evaluate = function(data, callback) {
n.send({ data : data});
n.once('message', function(result) {
callback(result);
}
}
The problem with this is example is that if one computation returns before another then it will not receive the correct results.
Is there any way to use a custom event name instead of 'message' in order to ensure that each time the evaluate function is called it creates a unique listener that will be removed once it is called?
How can I emit a custom event from with a child process?
Thanks in advance!
Create a a new EventEmitter with new events.EventEmitter. Then emit an event based on any message with eventType:"someType" you receive.
i.e. something like
events = require("events")
emitter = new events.EventEmitter
n.on("message", function(msg) { emitter.emit(msg.eventType,msg.body) })
Then bind listeners (with once or on) to that event emitter.
I think a nice way might be to call your evaluate function recursively until all computations are complete. Use a queue for your computations and process them in a FIFO manner.
var computations = [];
You would need to ditch the n.once('message' and outside your evalutate function define a n.on('message' handler. With each 'message' you receive that matches your evaluation data type (let's call it 'result'). Then you'd save that result in a FIFO queue of results. After which, you'd check if there are more computations, and call evaluate again. This can be cleaned up to splice off the oldest computation in the evaluate function if you'd like.
var results = [];
n.on('message', function(m) {
if(m.msg === "result") { // message looks like {msg: "result", data: 1234}
results.push(m.data);
}
if(computations.length > 0) {
var comp = computations[0]; // Save oldest computation
computations.splice(0,1); // Remove oldest computation from array
evaluate(comp); // Evaluate oldest computation
}
else
process_results(); // if you have 0 computations left,
// you got the last result so go process them.
});
Before whatever is calling evaluate, you should push your computation into the queue, and send the oldest to evaluate.
if(computations.length > 0) {
computations.push(new_comp); // Push on your newest compuation
var comp = computations[0]; // Save oldest computation
computations.splice(0,1); // Remove oldest computation from array
evaluate(comp); // Evaluate oldest computation
}
else {
evaluate(new_comp); // Evaluate computation
}
evaluate(new_comp); // Evaluate oldest computation
Your "DONE" Flag will be when computations.length === 0. And that will just naturally take care of itself with the above code. Any time you have 0 computations left in the n.on('message', handler.. you can call your results processing function:
To process your results it's a simple for loop.
function process_results() {
if(results.length > 0)
for(x in results)
console.log("New result: "+x);
}