I am trying to use fibers with streams:
var Fiber = require('fibers');
var Future = require('fibers/future');
var fs = require('fs');
function sleepForMs(ms) {
var fiber = Fiber.current;
setTimeout(function() {
fiber.run();
}, ms);
Fiber.yield();
}
function catchError(f, onError) {
return function () {
var args = arguments;
var run = function () {
try {
var ret = f.apply(null, args);
}
catch (e) {
onError(e);
}
return ret;
};
if (Fiber.current) {
return run();
}
else {
return Fiber(run).run();
}
}
}
function processFile(callback) {
var count, finished, onData, onException, onIgnoredEntry;
count = 0;
finished = false;
onException = function (error) {
if (finished) {
console.error("Exception thrown after already finished:", error.stack || error);
}
if (finished) {
return;
}
finished = true;
return callback(error);
};
onData = function(data) {
console.log("onData");
if (finished) {
return;
}
console.log("before sleep");
sleepForMs(500);
console.log("after sleep");
throw new Error("test");
};
return fs.createReadStream('test.js').on('data', catchError(onData, onException)).on('end', function() {
console.log("end");
if (finished) {
return;
}
finished = true;
return callback(null, count);
}).on('error', function(error) {
console.log("error", error);
if (finished) {
return;
}
finished = true;
return callback(error);
});
};
Fiber(function () {
console.log("Calling processFile");
Future.wrap(processFile)().wait();
console.log("processFile returned");
}).run();
console.log("back in main");
But it does not really work. Data callback finishes before fiber inside the callback finishes. So the above code outputs:
Calling processFile
back in main
onData
before sleep
end
processFile returned
after sleep
Exception thrown after already finished: Error: test
When in fact it should be more something like:
Calling processFile
back in main
onData
before sleep
after sleep
end
processFile returned
Error: test
Here's an implementation using wait.for (a wrapper around Fibers) https://github.com/luciotato/waitfor
In this implementation, a fiber is launched for each data chunk, so "n" tasks are launched in parallel. ProcessFile do not "returns" until all fibers complete.
This is a demo of how you can do this with Fibers & wait.for, but of course you should encapsulate the module-level vars and all functions in a class before using this in production.
var wait = require('wait.for');
var fs = require('fs');
var tasksLaunched=0;
var finalCallback;
var callbackDone=false;
var dataArr=[]
function sleepForMs(ms,sleepCallback) {
setTimeout(function() {
return sleepCallback();
}, ms);
}
function resultReady(err,data){
if (err){
callbackDone = true;
return finalCallback(err);
}
dataArr.push(data);
if (dataArr.length>=tasksLaunched && !callbackDone) {
callbackDone = true;
return finalCallback(null,dataArr);
}
}
function processChunk(data,callback) {
var ms=Math.floor(Math.random()*1000);
console.log('waiting',ms);
wait.for(sleepForMs,ms);
console.log(data.length,"chars");
return callback(null,data.length);
}
function processFile(filename,callback) {
var count, onData, onException, onIgnoredEntry;
count = 0;
finalCallback = callback;
onException = function (error) {
if (!callbackDone){
callbackDone = true;
return callback(error);
}
};
onData = function(data) {
console.log("onData");
tasksLaunched++;
wait.launchFiber(processChunk,data,resultReady);
};
fs.createReadStream(filename)
.on('data', onData)
.on('end', function() {
console.log("end");
})
.on('error', function(error) {
console.log("error", error);
if (!callbackDone) {
callbackDone = true;
return callback(error);
}
});
};
function mainFiber() {
console.log("Calling processFile");
var data = wait.for(processFile,'/bin/bash');
console.log(data.length,"results");
console.log("processFile returned");
};
//MAIN
wait.launchFiber(mainFiber);
console.log("back in main");
Reduce the sleeping time and set some priority or timers to the other blocks as well. so that after certain limit of time blocks get displayed according to priority. This is how you can get the output in your desired way.
Looks like no one knows how to do what you're asking.
In this case, you could process your stream in some traditional asynchronous way, applying your yielding function to the result.
Here are some examples of how to do so.
raw-body
One solution is to collect all stream data before processing any of it. It could be easily done with raw-body
module:
var rawBody = require('raw-body');
function processData(data) {
console.log("before sleep");
sleepForMs(500);
console.log("after sleep");
}
function processFile(callback) {
var stream = fs.createReadStream('fiber.js');
rawBody(stream, function(err, data) {
if (err) return callback(err);
Fiber(processData).run(data); // process your data
callback();
});
}
Using this example you will:
Fiber
processData
to main threadIf you want, you may add try ... catch
or any other exception handling to prevent processData
from crushing your app.
But if you really want to process all chunks of data the moment they arrive, you could use some smart control flow module. Here is en example of using queue
feature from async
module:
function processChunk(data, next) {
return function() {
console.log("before sleep");
sleepForMs(500);
console.log("after sleep");
next();
}
}
function processFile(callback) {
var q = async.queue(function(data, next) {
Fiber(processChunk(data, next)).run();
}, 1);
fs.createReadStream('fiber.js').on('data', function(data) {
q.push(data);
}).on('error', function(err) {
callback(err);
}).on('end', function() {
callback(); // not waiting to queue to drain
})
}
Using this example you will:
stream
, push
ing each new chunk to the processing queueprocessData
to main thread the moment the stream
is closed, not waiting for data chunks to be processedI know that it's not what you've asked for, but I hope it'll help you.