I have a few functions, the first one gets the a value from yahoo finance server, then i'm updating the DB with this value and i'm doing this using async.queue
and async.waterfall
The thing is that my DB connection never closed, I'm new to node.js so if you'll be able to give some examples i'll really appreciate it. Here is my code:
var request = require('request');
var cheerio = require ('cheerio');
var fs = require('fs');
var MongoClient = require('mongodb').MongoClient;
var async = require('async');
var dbName = "ystocks";
var port = "27017";
var requiredCollection = "stocks"
var host = "localhost";
MongoClient.connect("mongodb://" + host + ":" + port + "/" + dbName, function (error, db){
console.log("Connection is opened to : " + "mongodb://" + host + ":" + port + "/" + dbName);
var q = async.queue(function (doc, callback) {
// code for your update
var stockName = doc.ticker;
var stockValue = doc.value;
var yUrl = "http://finance.yahoo.com/q/ks?s=" + stockName;
console.log("The url is : " + yUrl);
updateStock(doc.ticker, yUrl, db, function(error) {
if (error) {
console.log("There was an error");
console.error(error);
}
else {
console.log("Done updating");
}
})
}, Infinity);
var cursor = db.collection(requiredCollection).find();
cursor.each(function(err, doc) {
if (err) throw err;
if(doc!=null) {
q.push(doc); // dispatching doc to async.queue
}
});
q.drain = function() {
if (cursor.isClosed()) {
console.log('all items have been processed');
// console.log("printinhg thr array:")
// for (var i=0; i<arr2.length; i++) {
// console.log(arr2[i]);
// }
db.close();
}
}
}); // end of connection to MongoClien
function updateStock(stockName, yUrl, db, callback) {
async.waterfall([
function getStockvalue(getStockvalueCallback) {
request(yUrl, stockName, function (error, response, body) {
var date = setTimeToLocal(new Date(), 3);
if (!error && response.statusCode == 200) {
var $ = cheerio.load(body);
// the keys - We get them from a certain class attribute
var span = $('.time_rtq_ticker>span');
var stockValue = $(span).text();
// parsing the value to a number in case it was a String
var parsedValue = parseFloat(stockValue);
// console.log("checking the type of the stockValue " + typeof(parsedValue) + " " + parsedValue);
// Calling the setStockValue function which will update the stock value
// setStockValue(stockName, parsedValue, callback, db);
console.log("Response received from -> " + yUrl);
console.log(date);
console.log("Stock - " + stockName + " --> " + stockValue );
getStockvalueCallback (null, stockValue );
}//end of !error && response.statusCode == 200
else if (response.statusCode == 404){
console.log("Response failed from " + yUrl + " --> error code: " + response.statusCode);
getStockvalueCallback(error);
}//end of statusCode == 400
}); // end of request
}, // end of getStockvalue
function setStockValueInDB (stockValue,setStockValueInDBCallback) {
var query = {'ticker' : stockName};
var operator ={'$set' : {'value' : stockValue}};
// update a specific documnet
db.collection('stocks').update(query, operator, callback);
console.log(stockName + " Updated successfully")
setStockValueInDBCallback(null);
} // end of setStockValueInDB
], callback); // end of waterfall
} // end of updateStock
// Gets the local date and the desired offset time
// set
function setTimeToLocal(date, offset ) {
// getting the local tome in millseconds
var localtime = date.getTime();
// getting the local offset in millseconds
var localOffset = date.getTimezoneOffset()*60000;
var utc = localOffset + localtime;
// Jerusalem offset
// var offset = 3;
// Jerusalem time in millseconds
var jerusalem = utc + (offset*3600000);
var d = new Date(jerusalem);
//console.log("Jerusalem Local Time: " + d.toLocaleString());
return d;
} // end of SetTimeToLocal
Ok so i had a problem with passing the callback to the update function i changed it, here is my new working code:
var request = require('request');
var cheerio = require ('cheerio');
var fs = require('fs');
var MongoClient = require('mongodb').MongoClient;
var async = require('async');
var dbName = "ystocks";
var port = "27017";
var requiredCollection = "stocks"
var host = "localhost";
MongoClient.connect("mongodb://" + host + ":" + port + "/" + dbName, function (error, db){
console.log("Connection is opened to : " + "mongodb://" + host + ":" + port + "/" + dbName);
var q = async.queue(function (doc, callback) {
// code for your update
var stockName = doc.ticker;
var stockValue = doc.value;
var yUrl = "http://finance.yahoo.com/q/ks?s=" + stockName;
console.log("The url is : " + yUrl);
updateStock(doc.ticker, yUrl, db,callback, function(error) {
if (error) {
console.log("There was an error");
console.error(error);
}
else {
console.log("Done updating");
}
})
}, Infinity);
var cursor = db.collection(requiredCollection).find();
cursor.each(function(err, doc) {
if (err) throw err;
if(doc!=null) {
q.push(doc); // dispatching doc to async.queue
}
});
q.drain = function() {
if (cursor.isClosed()) {
console.log('all items have been processed');
// console.log("printinhg thr array:")
// for (var i=0; i<arr2.length; i++) {
// console.log(arr2[i]);
// }
db.close();
}
}
}); // end of connection to MongoClien
function updateStock(stockName, yUrl, db, callback, cb) {
async.waterfall([
function getStockvalue(getStockvalueCallback) {
request(yUrl, stockName, function (error, response, body) {
var date = setTimeToLocal(new Date(), 3);
if (!error && response.statusCode == 200) {
var $ = cheerio.load(body);
// the keys - We get them from a certain class attribute
var span = $('.time_rtq_ticker>span');
var stockValue = $(span).text();
// parsing the value to a number in case it was a String
var parsedValue = parseFloat(stockValue);
// console.log("checking the type of the stockValue " + typeof(parsedValue) + " " + parsedValue);
// Calling the setStockValue function which will update the stock value
// setStockValue(stockName, parsedValue, callback, db);
console.log("Response received from -> " + yUrl);
console.log(date);
console.log("Stock - " + stockName + " --> " + stockValue );
getStockvalueCallback (null, parsedValue);
}//end of !error && response.statusCode == 200
else if (response.statusCode == 404){
console.log("Response failed from " + yUrl + " --> error code: " + response.statusCode);
getStockvalueCallback(error);
}//end of statusCode == 400
}); // end of request
}, // end of getStockvalue
function readFile(parsedValue, readFileCallback) {
fs.readFile('stocktest.json', function(error, file) {
if (error) {
console.error(error);
readFileCallback(error);
}
else {
console.log("The value from readFile function -- " + parsedValue );
readFileCallback (null, file, parsedValue);
}
}); // end of fs.readFile
}, // end of readfile
function processFile(file, parsedValue, processFileCallback) {
var stocksJson = JSON.parse(file);
if (stocksJson[stockName]!=null) {
console.log(stockName+" price : " + stocksJson[stockName].price);
console.log("changing the value...")
stocksJson[stockName].price = parsedValue;
console.log("Price after the change has been made -- " + stocksJson[stockName].price);
console.log("printing the the Json.stringify")
console.log(JSON.stringify(stocksJson, null, 4));
fs.writeFile('stocktest.json',JSON.stringify(stocksJson, null, 4) , function(error) {
if(!error) {
console.log("File Successfully Written");
console.log("The value from processFile func -- " + parsedValue);
processFileCallback(null, parsedValue);
}
if (error) {
console.error(error);
processFileCallback(error);
}
}); // end of writeFile
}
else {
console.log(stockName + " doesn't exist on the json");
processFileCallback(null);
}
}, // end of processFile
function setStockValueInDB (parsedValue,setStockValueInDBCallback) {
var query = {'ticker' : stockName};
var operator ={'$set' : {'value' : parsedValue}};
// update a specific documnet
db.collection('stocks').update(query, operator, callback);
console.log(stockName + " Updated successfully")
setStockValueInDBCallback(null);
} // end of setStockValueInDB
], cb); // end of waterfall
} // end of updateStock
// Gets the local date and the desired offset time
// set
function setTimeToLocal(date, offset ) {
// getting the local tome in millseconds
var localtime = date.getTime();
// getting the local offset in millseconds
var localOffset = date.getTimezoneOffset()*60000;
var utc = localOffset + localtime;
// Jerusalem offset
// var offset = 3;
// Jerusalem time in millseconds
var jerusalem = utc + (offset*3600000);
var d = new Date(jerusalem);
//console.log("Jerusalem Local Time: " + d.toLocaleString());
return d;
} // end of SetTimeToLocal