I am reading many files that contain a mixture of objects. I have one root stream that streams objects like:
// root stream
{"action": "action_type_a", "id": "1234"}
{"action": "action_type_b", id: "24566"}
{"action": "action_type_b", id: "0808098"}
{"action": "action_type_b", id: "8098098"}
{"action": "action_type_a", "id": "098098"}
{"action": "action_type_b", id: "08098"}
I pipe the root stream into 2 separate grep filtered streams one filtering for action_type_a that we'll call A and looks like:
// stream A
{"action": "action_type_a", "id": "1234"}
{"action": "action_type_a", "id": "098098"}
and one filtering for action_type_b B that looks like:
// stream B
{"action": "action_type_b", id: "24566"}
{"action": "action_type_b", id: "0808098"}
{"action": "action_type_b", id: "8098098"}
{"action": "action_type_b", id: "08098"}
I also have a seperate map of ids
// id map
{
"1234" : "098098",
""098098" : "8098098"
}
so ids on A objects can match up with ids on B objects
I want to process stream A compare each objects id to the map and then process B to see if there is a corresponding object in stream B
I"m not sure how to handle this without draining stream B before processing every object in A.
My first thought was to clone B for each entry in A. Is that possible?
my current attempt is to use event-streams es.map on stream A and inside of the map function i'm attempting to read B and compare it with A and a I read B I also write to record back to B. This however is giving me the :
(node) warning: possible EventEmitter memory leak detected. 11 listeners added. Use emitter.setMaxListeners() to increase limit.
error because I am re-defining the listeners for B within the map of A