I try to use aggregation pipeline to append/create new date based on the previous pipe value and save to a new collection.(see my pipeline below). However, the syntax is wrong and I got an error says
disallowed field type Date in object expression (at 'date') // date: new Date('$_id.year', '$_id.month', '$_id.day')
I wonder how I can new a date using my previous year, month, day value in mongo aggregation pipeline? Basically, after transform my ISODate to year, month, and day for grouping, I wanna transform them back to the ISODate format.
pipeline = [{
$match: {
event: 'sample_event',
}
}, {
$project: {
_id: false,
uuid: '$properties.distinct_id',
time: '$properties.time'
}
}, {
$group: {
_id: {
year: {
$year: '$time'
},
month: {
$month: '$time'
},
day: {
$dayOfMonth: '$time'
},
uuid: '$uuid'
}
}
}, {
$group: {
_id: {
year: '$_id.year',
month: '$_id.month',
day: '$_id.day'
},
value: { $sum: 1 }
}
}, {
$sort: {
'_id.year': 1,
'_id.month': 1,
'_id.day': 1
}
}, {
$project: {
_id: {
$concat: [
{ $substr: ['$_id.year', 0, 4] },
'-',
{
$cond: [
{ $lte: [ '$_id.month', 9 ] },
{ $concat: [
'0',
{ $substr: [ '$_id.month', 0, 2 ] }
]},
{ $substr: [ '$_id.month', 0, 2 ] }
]
},
'-',
{
$cond: [
{ $lte: [ '$_id.day', 9 ] },
{ $concat: [
'0',
{ $substr: [ '$_id.day', 0, 2 ] }
]},
{ $substr: [ '$_id.day', 0, 2 ] }
]
},
]
},
date: new Date('$_id.year', '$_id.month', '$_id.day'), // errorrrr
value: 1
}
}, {
$out: 'output_collection'
}];
You cannot "cast" new data types in the aggregation pipeline. The only thing really allowed is using $substr
on integer values ( not doubles ) and extracting the timestamp value as an integer from a date. But strings or numbers cannot become either numbers from strings or date objects.
Also no JavaScript is actually evaluated in the pipeline, any examples you might see are purely a "one way trip" where the JavaScript function is "evaluated" in the document creating the pipeline. But you cannot have a function act on data "within" the pipeline in any way.
Your best approach to writing a new collection is to process the cursor result and write out using the Bulk Operations API. So if this is intended for periodically building aggregated results then you can even basically append or update to the target collection.
This means using the native methods from the underlying native driver. Getting to those with mongojs looks a little funny, but the methods are still there:
var async = require('async'),
mongojs = require('mongojs'),
db = mongojs('mongodb://localhost/test',['sample']);
db._get(function(err,db) {
if (err) throw err;
var source = db.collection('source'),
bulk = db.collection('target').initializeOrderedBulkOp(),
count = 0;
var cursor = source.aggregate(
[
{ "$match": { "event" : "sample event" } },
{ "$group": {
"_id": {
"day": {
"$subtract": [
{ "$subtract": [ "$properties.time", new Date("1970-01-01") ] },
{ "$mod": [
{ "$subtract": [ "$properties.time", new Date("1970-01-01") ] },
1000 * 60 * 60 * 24
]}
]
},
"uuid": "$properties.distinct_id"
}
}}.
{ "$group": { "_id": "$_id.day", "count": { "$sum": 1 } }}
],
{ "cursor": { "batchSize": 101 } }
);
cursor.on("data",function(data) {
bulk.insert({ "date": new Date(data._id), "count": data.count });
count++;
if ( count % 1000 == 0 ) {
cursor.pause();
bulk.execute(function(err,result) {
if (err) throw err;
bulk = db.collection('target').initializeOrderedBulkOp();
cursor.resume();
});
}
});
cursor.on("end",function() {
if ( count % 1000 != 0 )
bulk.execute(function(err,result) {
console.log("done");
});
});
});
So that is your aggregation operation greatly simplified and running much faster. The output is an epoch timestamp value representing the "day", obtained by the fact that when subtracting one date object from another you get the timestamp back as an integer. The general date math here rounds the number to the value representing the "day" from the date supplied.
This works better than coercing strings and is suitable for feeding to a Date()
function as a constructor for a new date object, which is of course done "outside" of the aggregation pipeline.
The inserts here are performed in bulk and only once every thousand items, so that is very wire efficient. As this is a streaming interface in fact, then you work with the events and use .pause()
and .resume()
in order to avoid too many concurrent requests and/or excessive memory usage. Tune as required, but whatever the size, the driver will actually break up into requests of 1000 for any one send and return.
Or of course just live without casting the value to a date in this stage and just use $out
to create the collection, then make your code cast the dates from any results that are read. But you cannot manipulate a date and have a date object returned in this way from within the aggregation pipeline itself. Use whatever approach is best for you.