NodeJS pipe scoping trouble (multiple files)

Update 2: I've resolved the original problem, but am still curious if anyone can explain why the fix was necessary.


I'm trying to read multiple csv files and post them to a REST API and it looks like I'm having some sort of strange cross-talk. I keep getting errors back from the REST API due to sending data for another object. I wouldn't be surprised if I'm missing something obvious, but I can't seem to find any solutions online.

Here is my code which may make things clearer:

fs = require 'fs'
parse = require 'csv-parse'
transform = require 'stream-transform'
eventStream = require 'event-stream'
request = require 'request'
async = require 'async'

# strip out _key entities, which REST API will refuse
transformer = transform (data, callback) ->
  if data.QUOTE_Key?
    delete data.QUOTE_Key
  else
    delete data.QUOTE_LINE_Key
  callback null, data

# send an individual object to REST API
sendData = (entityId, data, callback) ->
  postConfig = genPostConfig entityId, data
  request.post postConfig, (err, httpResponse, body) ->
    if body.errors?
      console.log "response from #{entityId}: ", body, "\n\tdata:", data,
        "\n\n"
    callback()

# generate parse function -- Maybe new parser object is required for each loop?
genParser = ->
  parser = parse
    columns: true
    delimiter: '|'
    skip_empty_lines: true

# create the request config to send data
genPostConfig = (entityId, data) ->
  uri: "http://localhost:8080/api/v1/#{entityId}"
  'content-type': 'application/json'
  body: JSON.stringify data

# logic to read an input file, parse it, and upload entries
uploadFile = (entityId) ->
  input = fs.createReadStream "#{entityId}.txt"

  input
    .pipe genParser()
    .pipe transformer
    .pipe eventStream.map (data, callback2) ->
      sendData entityId, data, callback2
    .on 'end', callback

# ---------------------------
# begin functional logic here
# ---------------------------
entities = ['QUOTE', 'QUOTE_LINE']
#for entityId in ['QUOTE', 'QUOTE_LINE']
#  uploadFile entityId

async.each entities, (entityId, callback) ->
  uploadFile entityId
  callback()

As you can see, I've attempted both a standard and async for loop through the files. I've also isolated the variables into functions in a hope that it would reduce the cross-talk. However, I am still getting responses where a QUOTE entity is being sent to QUOTE_LINE, and vice versa.

Any ideas?

Update 1: When limited to one file (I've tested against both), the process will complete without error.

Update 2: I've found that the problem was in the creation of the eventStream.map definition. The simplified and working code is as follows:

fs = require 'fs'
parse = require 'csv-parse'
transform = require 'stream-transform'
eventStream = require 'event-stream'
request = require 'request'

tenantId = 'simple'
envId = 'dev'

sendData = (entityId, data, callback) ->
  postConfig =
    uri: "http://localhost:8080/api/v1/#{entityId}"
    'content-type': 'application/json'
    body: JSON.stringify data
  request.post postConfig, (err, httpResponse, body) ->
    {Quote_Id, Line_Id} = data
    objId = "Quote_Id: #{Quote_Id}"
    objId += ", Line_Id: #{Line_Id}" if Line_Id?
    console.log "response from #{entityId}, #{objId}: ", body
    callback()

genSaver = (entityId) ->
  eventStream.writeArray (err, allData) ->
    for data in allData
      sendData entityId, data, ->

uploadFile = (entityId) ->
  input = fs.createReadStream "#{entityId}.txt"
  saver = genSaver entityId

  input
    .pipe parse
      columns: true
      delimiter: '|'
      skip_empty_lines: true
    .pipe transform (data, callback) ->
      delete data.QUOTE_Key if data.QUOTE_Key?
      delete data.QUOTE_LINE_Key if data.QUOTE_LINE_Key?
      callback null, data
    .pipe saver

entities = ['QUOTE', 'QUOTE_LINE']
for entityId in entities
  uploadFile entityId

I would still appreciate any explanations as to why this resolved the issue. I think it has something to do with scoping, but I would have expected the call to sendData to be enough encapsulation.