javascript - RxJs Dynamically add events from another EventEmitter -
i have observable coming eventemitter http connection, streaming events.
occasionally have disconnect underlying stream , reconnect. not sure how handle rxjs.
i not sure if can complete source , dynamically add other "source" source, or if have have @ bottom.
var rx = require('rx'), eventemitter = require('events').eventemitter; var eventemitter = new eventemitter(); var eventemitter2 = new eventemitter(); var source = rx.observable.fromevent(eventemitter, 'data') var subscription = source.subscribe(function (data) { console.log('data: ' + data); }); setinterval(function() { eventemitter.emit('data', 'foo'); }, 500); // eventemitter stop emitting data, underlying connection closed // attach seconds eventemitter (new connection) // obvouisly doesn't work source .fromevent(eventemitter2, 'data')
puesdo code more of doing, creating second stream connection before close first, don't "lose" data. here not sure how stop observable without "losing" records due onnext not being called due buffer.
var streams = [], notifiers = []; // create initial stream createnewstream(); setinterval(function() { if (params of stream have changed) createnewstream(); }, $1minutes / 3); function createnewstream() { var stream = new eventemitterstream(); stream.once('connected', function() { stopothers(); streams.push(stream); createsource(stream, 'name', 'id'); }); } function stopothers() { while(streams.length > 0) { streams.pop().stop(); // stop old stream } while(notifiers.length > 0) { // if call this, buffer may lose records, before onnext() called //notifiers.pop()(rx.notification.createoncompleted()); } } function createobserver(tag) { return rx.observer.create( function (x) { console.log('next: ', tag, x.length, x[0], x[x.length-1]); }, function (err) { console.log('error: ', tag, err); }, function () { console.log('completed', tag); }); } function createsource(stream, event, id) { var source = rx.observable .fromevent(stream, event) .bufferwithtimeorcount(time, max); var subscription = source.subscribe(createobserver(id)); var notifier = subscription.tonotifier(); notifiers.push(notifier); }
first , formost, need make sure can remove listeners "dead" emitter. otherwise you'll create leaky application.
it seems way you'll know eventemitter has died watch frequency, unless have event fires on error or completion (for disconnections). latter much, more preferrable.
regardless, the secret sauce of rx making sure wrap data stream creation , teardown in observable. if wrap creation of emitter in observable, means tear down, you'll able use awesome things retry
operator recreate observable.
so if have no way of knowing if died, , want reconnect it, can use this:
// i'll presume have function eventemitter // set function getemitter() { var emitter = new eventemitter(); setinterval(function(){ emitter.emit('data', 'foo'); }, 500) return emitter; } var emitterobservable = observable.create(function(observer) { // setup data stream var emitter = getemitter(); var handler = function(d) { observer.onnext(d); }; emitter.on('data', handler); return function() { // tear down data stream in disposal function emitter.removelistener('on', handler); }; }); // can rx magic! emitterobservable // if doesn't emit in 700ms, throw timeout error .timeout(700) // catch all* errors , retry // means emitter torn down , recreated // if times out! .retry() // values .subscribe(function(x) { console.log(x); });
* note: retry catches all errors, may want add catch
above handle non-timeout errors. you.
Comments
Post a Comment