rxjs - How to create an observable from pubnub subscribe -


i'm struggling how convert following observables using rxjs lib.

var client = pubnub.init({   publish_key: 'pubkey',   subscribe_key: 'subkey' });  client.subscribe({   channel: 'admin:posts',   message: function(message, env, channel){console.log("message");},   connect: function(){console.log("connected");},   disconnect: function(){console.log("disconnected");},   reconnect: function(){console.log("reconnected");},   error: function(){console.log("network error");},   }); 

i have message callback converted observable along rest of callbacks.

any ideas on how this?

thank you

update - 4/29/15

here ended using working. should add need subscribe pubnub after user signs in , clean logout.

please let me know if approach:

var self = this;  var logins = rx.observable.create(function (obs) {   //using session manager in ember.   self.get('session').on('sessionauthenticationsucceeded', function(e){     var data = {       token:this.content.secure.token,       email:this.content.secure.email     }      obs.onnext(data);   });   self.get('session').on('sessionauthenticationfailed', function(e){obs.onerror(e)});   return function(){     self.get('session').off('sessionauthenticationsucceeded', function(e){obs.onnext(e)});     self.get('session').off('sessionauthenticationfailed', function(e){obs.onerror(e)});   } });  var logouts = rx.observable.create(function (obs) {   self.get('session').on('sessioninvalidationsucceeded', function(){obs.onnext()});   self.get('session').on('sessioninvalidationfailed', function(e){obs.onerror(e)});   return function(){     self.get('session').off('sessioninvalidationsucceeded', function(e){obs.onnext(e)});     self.get('session').off('sessioninvalidationfailed', function(e){obs.onerror(e)});   } });  var datastream = logins   .map(function(credentials){     return pubnub.init({       publish_key: 'pub_key',       subscribe_key: 'sub_key',       auth_key: credentials.token,       uuid: credentials.email     });   })   .scan(function(prev, current){     prev.unsubscribe({       channel_group:'admin:data'     });      return current;   })   .concatmap(function(client){     return rx.observable.create(function (observer) {        client.subscribe({         channel_group:'admin:data',         message:function(message, env, channel){           observer.onnext({message:message, env:env, channel:channel, error:null});         },         error:function(error){           observer.onnext({error:error})         }       });        return function(){         client.unsubscribe({channel_group:'admin:data'});       }     });   })   .takeuntil(logouts)   .publish()   .refcount();    var sub1 = datastream.subscribe(function(data){     console.log('sub1', data);   });    var sub2 = datastream.subscribe(function(data){     console.log('sub2', data);   }); 

sure, can create source with .create such:

// client var client = pubnub.init({     publish_key: 'pubkey',     subscribe_key: 'subkey' });  var source = rx.observable.create(function (observer) {     client.subscribe({          channel: 'admin:posts',          message: function(message){ observer.onnext(message); }, // next item          error: function(err){ observer.onerror(message); },          // other functions based on logic, might want handling     });     return function(){ // dispose         // nothing here, yet, might want define completion     } }); 

Comments

Popular posts from this blog

shopping cart - Page redirect not working PHP -

php - How to modify a menu to show sub-menus -

python - Installing PyDev in eclipse is failed -