Updating documents in mongoDB using node.js

I have a few functions, the first one gets the a value from yahoo finance server, then i'm updating the DB with this value and i'm doing this using async.queue and async.waterfall The thing is that my DB connection never closed, I'm new to node.js so if you'll be able to give some examples i'll really appreciate it. Here is my code:

var request = require('request');
var cheerio = require ('cheerio');
var fs = require('fs');
var MongoClient = require('mongodb').MongoClient;
var async  =  require('async');

var dbName = "ystocks";
var port = "27017";
var requiredCollection = "stocks"
var host = "localhost";


MongoClient.connect("mongodb://" + host + ":" + port + "/" + dbName, function (error, db){

    console.log("Connection is opened to : " + "mongodb://" + host + ":" + port + "/" + dbName);

    var q = async.queue(function (doc, callback) {
  // code for your update

            var stockName = doc.ticker;
            var stockValue =  doc.value;

            var yUrl = "http://finance.yahoo.com/q/ks?s=" + stockName;
            console.log("The url is : " + yUrl);


            updateStock(doc.ticker, yUrl, db, function(error) {
            if (error) {
                console.log("There was an error");
                console.error(error);

            }
            else {
                console.log("Done updating");

            }
            }) 



    }, Infinity);

var cursor = db.collection(requiredCollection).find();
cursor.each(function(err, doc) {
  if (err) throw err;
  if(doc!=null) {
  q.push(doc); // dispatching doc to async.queue
} 
});

q.drain = function() {
  if (cursor.isClosed()) {
    console.log('all items have been processed');
    // console.log("printinhg thr array:") 
    // for (var i=0; i<arr2.length; i++) {
    //  console.log(arr2[i]);
    // }
    db.close();
  }
}

 }); // end of connection to MongoClien





function updateStock(stockName, yUrl, db, callback) {
    async.waterfall([

        function getStockvalue(getStockvalueCallback) {
            request(yUrl, stockName,  function (error, response, body) {
                var date  = setTimeToLocal(new Date(), 3);
                if (!error && response.statusCode == 200) {         
                    var $ =  cheerio.load(body);                    

                    // the keys - We get them from a certain class attribute
                    var span =  $('.time_rtq_ticker>span');
                    var stockValue = $(span).text();

                    // parsing the value to a number in case it was a String
                    var parsedValue = parseFloat(stockValue);
                //  console.log("checking the type of the stockValue " +  typeof(parsedValue) + " " + parsedValue);

                    // Calling the setStockValue function which will update the stock value
                //  setStockValue(stockName, parsedValue, callback, db);

                    console.log("Response received from -> " + yUrl);
                    console.log(date);
                    console.log("Stock  - " + stockName + " --> " + stockValue ); 
                    getStockvalueCallback (null, stockValue );

                }//end of !error && response.statusCode == 200
                else if (response.statusCode == 404){
                    console.log("Response failed from " + yUrl + " --> error code:  " +  response.statusCode);
                    getStockvalueCallback(error);
                }//end of statusCode == 400
            }); // end of request 

        }, // end of getStockvalue

        function setStockValueInDB (stockValue,setStockValueInDBCallback) {
            var query = {'ticker' : stockName};
            var operator ={'$set' : {'value' : stockValue}};
            // update a specific documnet
            db.collection('stocks').update(query, operator, callback);
            console.log(stockName + " Updated successfully")
            setStockValueInDBCallback(null);

        } // end of setStockValueInDB

        ], callback); //  end of waterfall


} // end of updateStock


// Gets the local date and the desired offset time
    // set 
    function setTimeToLocal(date, offset ) {
        // getting the local tome in millseconds 
            var localtime = date.getTime();

            // getting the local offset in millseconds
            var localOffset = date.getTimezoneOffset()*60000;

            var utc = localOffset + localtime;

            // Jerusalem offset
            //  var offset = 3;

            // Jerusalem time in millseconds
            var jerusalem =  utc + (offset*3600000);
            var d = new Date(jerusalem);
            //console.log("Jerusalem Local Time: " + d.toLocaleString());
            return d;
    } // end of SetTimeToLocal

Ok so i had a problem with passing the callback to the update function i changed it, here is my new working code:

var request = require('request');
var cheerio = require ('cheerio');
var fs = require('fs');
var MongoClient = require('mongodb').MongoClient;
var async  =  require('async');

var dbName = "ystocks";
var port = "27017";
var requiredCollection = "stocks"
var host = "localhost";


MongoClient.connect("mongodb://" + host + ":" + port + "/" + dbName, function (error, db){

    console.log("Connection is opened to : " + "mongodb://" + host + ":" + port + "/" + dbName);

    var q = async.queue(function (doc, callback) {
  // code for your update

            var stockName = doc.ticker;
            var stockValue =  doc.value;

            var yUrl = "http://finance.yahoo.com/q/ks?s=" + stockName;
            console.log("The url is : " + yUrl);


            updateStock(doc.ticker, yUrl, db,callback, function(error) {
            if (error) {
                console.log("There was an error");
                console.error(error);

            }
            else {
                console.log("Done updating");

            }
            }) 



    }, Infinity);

var cursor = db.collection(requiredCollection).find();
cursor.each(function(err, doc) {
  if (err) throw err;
  if(doc!=null) {
  q.push(doc); // dispatching doc to async.queue
} 
});

q.drain = function() {
  if (cursor.isClosed()) {
    console.log('all items have been processed');
    // console.log("printinhg thr array:") 
    // for (var i=0; i<arr2.length; i++) {
    //  console.log(arr2[i]);
    // }
    db.close();
  }
}

 }); // end of connection to MongoClien





function updateStock(stockName, yUrl, db, callback, cb) {
    async.waterfall([

        function getStockvalue(getStockvalueCallback) {
            request(yUrl, stockName,  function (error, response, body) {
                var date  = setTimeToLocal(new Date(), 3);
                if (!error && response.statusCode == 200) {         
                    var $ =  cheerio.load(body);                    

                    // the keys - We get them from a certain class attribute
                    var span =  $('.time_rtq_ticker>span');
                    var stockValue = $(span).text();

                    // parsing the value to a number in case it was a String
                    var parsedValue = parseFloat(stockValue);
                //  console.log("checking the type of the stockValue " +  typeof(parsedValue) + " " + parsedValue);

                    // Calling the setStockValue function which will update the stock value
                //  setStockValue(stockName, parsedValue, callback, db);

                    console.log("Response received from -> " + yUrl);
                    console.log(date);
                    console.log("Stock  - " + stockName + " --> " + stockValue ); 
                    getStockvalueCallback (null, parsedValue);

                }//end of !error && response.statusCode == 200
                else if (response.statusCode == 404){
                    console.log("Response failed from " + yUrl + " --> error code:  " +  response.statusCode);
                    getStockvalueCallback(error);
                }//end of statusCode == 400
            }); // end of request 

        }, // end of getStockvalue
        function readFile(parsedValue, readFileCallback) {
            fs.readFile('stocktest.json', function(error,  file) {
            if (error) {
                console.error(error);
                readFileCallback(error);
            }
            else {
                console.log("The value from readFile function -- " + parsedValue );
                readFileCallback (null, file, parsedValue);
            }
            }); // end of fs.readFile

        }, // end of readfile
        function processFile(file, parsedValue, processFileCallback) {
            var stocksJson =  JSON.parse(file);

            if (stocksJson[stockName]!=null) {
                console.log(stockName+" price : " + stocksJson[stockName].price);
                console.log("changing the value...")
                stocksJson[stockName].price =  parsedValue;
                console.log("Price after the change has been made -- " + stocksJson[stockName].price);
                console.log("printing the the Json.stringify")
                console.log(JSON.stringify(stocksJson, null, 4));
                 fs.writeFile('stocktest.json',JSON.stringify(stocksJson, null, 4) , function(error) {  
                                if(!error) {
                                    console.log("File Successfully Written");
                                    console.log("The value from processFile func -- " + parsedValue);
                                    processFileCallback(null, parsedValue);
                                }
                                if (error) {
                                    console.error(error);
                                    processFileCallback(error);
                                }

                           }); // end of writeFile
            }
            else {
                console.log(stockName + " doesn't exist on the json");
                processFileCallback(null);
            }
        }, //  end of processFile


        function setStockValueInDB (parsedValue,setStockValueInDBCallback) {
            var query = {'ticker' : stockName};
            var operator ={'$set' : {'value' : parsedValue}};
            // update a specific documnet
            db.collection('stocks').update(query, operator, callback);
            console.log(stockName + " Updated successfully")
            setStockValueInDBCallback(null);

        } // end of setStockValueInDB

        ], cb); //  end of waterfall


} // end of updateStock


// Gets the local date and the desired offset time
    // set 
    function setTimeToLocal(date, offset ) {
        // getting the local tome in millseconds 
            var localtime = date.getTime();

            // getting the local offset in millseconds
            var localOffset = date.getTimezoneOffset()*60000;

            var utc = localOffset + localtime;

            // Jerusalem offset
            //  var offset = 3;

            // Jerusalem time in millseconds
            var jerusalem =  utc + (offset*3600000);
            var d = new Date(jerusalem);
            //console.log("Jerusalem Local Time: " + d.toLocaleString());
            return d;
    } // end of SetTimeToLocal