First-time Stack Overflow questioner here... will try to include as much detail as possible.
I'm trying to get Apache Flume log data transferred through an Avro sink to a Node.js server, listening on a particular port. I intend to use Collective Media's node-avro library to help with serialization between Avro's binary format and JSON, so I can work with the data in Node.js (I'm passing it on to clients via socket.io pub/sub).
I'm petty sure I have Flume configured correctly since I see the data flowing through the channel and output to the console (for debugging only, I have the data sinking out to console as well). When I enable the Avro sink and bring up the Node.js server that listens on the same port, though, Flume throws an exception when it tries to do the Avro transfer:
2013-02-15 22:06:09,858 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to send events
at org.apache.flume.sink.AvroSink.process(AvroSink.java:325)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: localhost, port: 4242 }: Failed to send batch
at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:236)
at org.apache.flume.sink.AvroSink.process(AvroSink.java:309)
... 3 more
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: localhost, port: 4242 }: Exception thrown from remote handler
at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:318)
at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:295)
at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:224)
... 4 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: NettyTransceiver closed
at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128)
at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:310)
... 6 more
Caused by: java.io.IOException: NettyTransceiver closed
at org.apache.avro.ipc.NettyTransceiver.disconnect(NettyTransceiver.java:338)
at org.apache.avro.ipc.NettyTransceiver.access$200(NettyTransceiver.java:59)
at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:496)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
at org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:348)
at org.jboss.netty.handler.codec.frame.FrameDecoder.channelClosed(FrameDecoder.java:236)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:93)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
at org.jboss.netty.channel.Channels.fireChannelClosed(Channels.java:476)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:623)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:101)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:364)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:238)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
... 1 more
2013-02-15 22:06:14,895 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:178)] Avro sink k1: Building RpcClient with hostname: 127.0.0.1, port: 4242
What I'm not sure about, is how to figure out if my Node.js service is at least getting the message. I'm pretty new to Node.js so that doesn't help, but here's a snippet of the code that sets up the listener:
var flumeSink = require('http').createServer(flumeHandler);
flumeSink.listen(8000);
function flumeHandler (req, res) {
console.log("Got it!");
//var schema = avro.prepareSchema("string");
//var buffer = schema.encode("foo");
//var value = schema.decode(buffer);
}
I'm thinking I've set up the Node.js side wrong. I'm using the HTTP module, which perhaps isn't the right module. Maybe I need to consider writing a custom sink in Node.js? Pointers/help appreciated!