rxjs - How to create an observable from pubnub subscribe -
i'm struggling how convert following observables using rxjs lib.
var client = pubnub.init({ publish_key: 'pubkey', subscribe_key: 'subkey' }); client.subscribe({ channel: 'admin:posts', message: function(message, env, channel){console.log("message");}, connect: function(){console.log("connected");}, disconnect: function(){console.log("disconnected");}, reconnect: function(){console.log("reconnected");}, error: function(){console.log("network error");}, }); i have message callback converted observable along rest of callbacks.
any ideas on how this?
thank you
update - 4/29/15
here ended using working. should add need subscribe pubnub after user signs in , clean logout.
please let me know if approach:
var self = this; var logins = rx.observable.create(function (obs) { //using session manager in ember. self.get('session').on('sessionauthenticationsucceeded', function(e){ var data = { token:this.content.secure.token, email:this.content.secure.email } obs.onnext(data); }); self.get('session').on('sessionauthenticationfailed', function(e){obs.onerror(e)}); return function(){ self.get('session').off('sessionauthenticationsucceeded', function(e){obs.onnext(e)}); self.get('session').off('sessionauthenticationfailed', function(e){obs.onerror(e)}); } }); var logouts = rx.observable.create(function (obs) { self.get('session').on('sessioninvalidationsucceeded', function(){obs.onnext()}); self.get('session').on('sessioninvalidationfailed', function(e){obs.onerror(e)}); return function(){ self.get('session').off('sessioninvalidationsucceeded', function(e){obs.onnext(e)}); self.get('session').off('sessioninvalidationfailed', function(e){obs.onerror(e)}); } }); var datastream = logins .map(function(credentials){ return pubnub.init({ publish_key: 'pub_key', subscribe_key: 'sub_key', auth_key: credentials.token, uuid: credentials.email }); }) .scan(function(prev, current){ prev.unsubscribe({ channel_group:'admin:data' }); return current; }) .concatmap(function(client){ return rx.observable.create(function (observer) { client.subscribe({ channel_group:'admin:data', message:function(message, env, channel){ observer.onnext({message:message, env:env, channel:channel, error:null}); }, error:function(error){ observer.onnext({error:error}) } }); return function(){ client.unsubscribe({channel_group:'admin:data'}); } }); }) .takeuntil(logouts) .publish() .refcount(); var sub1 = datastream.subscribe(function(data){ console.log('sub1', data); }); var sub2 = datastream.subscribe(function(data){ console.log('sub2', data); });
sure, can create source with .create such:
// client var client = pubnub.init({ publish_key: 'pubkey', subscribe_key: 'subkey' }); var source = rx.observable.create(function (observer) { client.subscribe({ channel: 'admin:posts', message: function(message){ observer.onnext(message); }, // next item error: function(err){ observer.onerror(message); }, // other functions based on logic, might want handling }); return function(){ // dispose // nothing here, yet, might want define completion } });
Comments
Post a Comment