Implementing a stream that exposes a real-time source

I've written a package called osx-audio which exposes the currently-selected Mac OS X audio input as a PCM stream. It's very early, but working. However I need to wrap my head around the best method for implementing a real-time stream.

Some items to note:

  • There shouldn't be much in the way of old audio kept around. I want there to be some buffers so that a slow reader can catch up, but I don't want there to be much old data left around.
  • A new reader should start as close to real-time as possible. If a new reader joins the stream, there shouldn't be much old data waiting around for it.
  • The source for the stream is a singleton. Making the stream available to multiple consumers should be handled in Javascript, not by instantiating a new source.

As such, this is the implementation that I'm currently working on (what will become version 0.2.0). What's missing from this version currently is a reference counter: I only want to shut down the stream when there are no consumers left, and a stream should only be allowed to be closed by its last consumer.

The implementation as it stands:

var audio = require('./audio');
var inherits = require('util').inherits;
var Readable = require('stream').Readable;
var debug = require('debug')('osxaudio:input');

var MAX_BUFFERS = 16;

var input = null;

function Input(opts) {
  if (!(this instanceof Input)) {
    return new Input(opts);
  }
  Readable.call(this, opts);

  if (!opts) opts = {};

  this._audioBuffers = [];

  if (!input) {
    input = new audio.input();
  }

  input.on('message', this._audioCallback.bind(this));

  if (!input.isOpen()) {
    debug('opening input');
    input.openInput();
  }

  debug("instantiated.");

  return this;
}
inherits(Input, Readable);

Input.prototype._audioCallback = function(size, message) {
  if (this._audioBuffers.push(message) > MAX_BUFFERS) {
    debug('exceeded MAX_BUFFERS, shifting older buffers');
    this._audioBuffers.splice(0, this._audioBuffers.length - MAX_BUFFERS);
  }
  this.read(0);
};

Input.prototype._read = function(n) {
  debug("_read()");
  var chunk = this._audioBuffers.pop();
  if (typeof chunk !== 'undefined') {
    debug("_read() pushing data");
    this.push(chunk);
  }
  else {
    if (input.isOpen()) {
      debug("_read() pushing blank string");
      this.push('');
    }
    else {
      debug("_read() input is closed. push null");
      this.push(null);
    }
  }
};

Input.prototype.openInput = function() {
  if(!input) {
    input = new audio.input();
  }

  if (!input.isOpen()) {
    input.openInput();
  }
};

Input.prototype.closeInput = function() {
  if (input.isOpen()) {
    debug("closeInput() input was open, shutting down");
    input.closeInput();
  }
};

You can see that my thinking is: each consumer should instantiate Input (via new Input()) and have its own buffers and such. My buffering method is faulty though, as there's some junk audio at the front of it each time (I think due to contention between my buffers and the native stream buffers behind Readable.push()).

My question is: are there any examples of this type of system, or what are the best practices for implementing something like this? Am I on the right track?


If it helps, you can check out the library on github. The latest changes (what will be 0.2.0) are in the v0.2.0 branch. Much appreciated, readers!