I'm trying to set up a simple document feeder to benchmark ElasticSearch and I've chosen NodeJS because I thought it would be easiest to work with simple JSON structures. Unfortunately, it seems that I'm shooting myself in the foot.
Here are the relevant bits:
var logResults = function (results) {
docsIndexed++;
var now = +new Date();
if (docsIndexed % 10000 === 0) {
log("Indexed " + docsIndexed + " documents in " + (now - start) + "ms");
}
}
var submitDocument = function (source, index, type) {
var doc = ejs.Document(index, type).source(source);
doc.doIndex(logResults);
}
var schemas = {};
_(10).times(function (idx) {
schemas[pickRandomWord()] = generateRandomDocumentSchema(_.random(idx, 15), 10);
});
var docCount = 0, docsIndexed = 0, start = +new Date();
Object.keys(schemas).forEach(function (schemaName, idx) {
var register = function () {
submitDocument(generateRandomDocument(schemas[schemaName]),
'documents', schemaName);
docCount++;
};
_((idx + 1) * 1000).times(register);
log("Registered " + ((idx + 1) * 1000) + " documents for indexing for schema "
+ schemaName + ". Total: " + docCount);
});
This works fine for datasets of up to 100 000 records, but if I'm going for millions it blows up on me with an out of memory error.
The doIndex function from elastic.js is asynchronous and my suspicion is that many objects are being queued up before actually executing. When that number becomes significant, the process dies. I don't understand why no callbacks are being executed before the loop finishes. What I would like is a way to either make this synchronous or to have some sort of pooling for it so that it doesn't queue up more objects before sending the other ones.
Can someone please suggest a library that can help with this or a better way to structure the code? Thanks.
I've tried Peter's suggestion to use async.queue and I've come up with this:
/** Submit QUANT * (idx + 1) documents for each schema into the index */
var QUANT = 100
, docCount = 0
, docsIndexed = 0
, queue = async.queue(submitDocument, 1000)
, paused = false
, start = +new Date();
queue.saturated = function () {
log("queue is full");
paused = true;
};
queue.empty = function () {
log("All items were given to workers");
paused = false;
};
Object.keys(schemas).forEach(function (schemaName, idx) {
var count = 0;
while (count < (idx + 1) * QUANT) {
if (!paused) {
queue.push({
source: generateRandomDocument(schemas[schemaName]),
index: 'documents',
type: schemaName
});
count++; docCount++;
}
};
log("Registered " + count + " documents for indexing for schema "
+ schemaName + ". Total: " + docCount);
});
If it gets to pause in the loop, it hangs forever (i.e. queue.saturated is called, paused is set to true and then the program is stuck in the while loop). The queue.empty callback is never called. This works fine if I have the queue's concurrency limit above the numbers I want to process - all messages are logged as expected. What should I change here?
I've changed the code to use an async loop and now it works. I got a RangeError: Maximum call stack size exceeded error with which I struggled for a while.
Object.keys(schemas).forEach(function (schemaName, idx) {
var count = 0, executions = 0;
async.whilst(
function () {
var test = count < (idx + 1) * QUANT;
if (!test) log("Registered " + count + " documents for indexing for schema "
+ schemaName + ". Executions: " + executions + ". Total: " + docCount);
return test;
},
function (callback) {
executions++;
if (!paused) {
queue.push({ source: generateRandomDocument(schemas[schemaName]), index: 'documents', type: schemaName });
count++; docCount++;
}
setTimeout(callback, 0);
// also tried with "return process.nextTick(callback)"
// and return callback();
// this blows up nicely with an out of memory error
},
function (err) {}
);
});
I'm beginning to get frustrated because I don't think this use case is really that complicated and I hope I have a fair understanding of how the language works.
Best bet is going to be async.queue with a large limit. Just make sure that you don't just keep adding items to the queue after it's already saturated. Use saturation of the queue as back pressure to wait until some work has been done to then start queueing up more tasks. The queue has hooks to support these key events.