c# - RX - Group/Batch bursts of elements in an observable sequence -


i have observable sequence. when first element inserted, i start timer , batch subsequent inserted elements during timespan of timer. then, timer wouldn't start again until element inserted in sequence.

so this:

--------|=====timespan====|---------------|=====timespan====|-------------->         1  2 3 4    5                     6 7            8 

would produce:

[1,2,3,4,5], [6,7,8]  

i tried observable.buffer() , timespan experimentation, can see timer started subscribe observable sequence , restarted previous timer completed.

so having same sequence previous example , using buffer() timespan, have this:

|=====timespan====|=====timespan====|=====timespan====|=====timespan====|-->         1  2 3 4    5                      6 7           8 

which produce this:

[1,2,3,4], [5], [6,7], [8] 

here how tested behavior buffer:

var source = observable.concat(observable.timer(timespan.fromseconds(6)).select(o => 1),                                observable.timer(timespan.fromseconds(1)).select(o => 2),                                observable.timer(timespan.fromseconds(3)).select(o => 3),                                observable.never<int>());  console.writeline("{0} => started", datetime.now); source.buffer(timespan.fromseconds(4))       .subscribe(i => console.writeline("{0} => [{1}]", datetime.now, string.join(",", i))); 

with output:

4/24/2015 7:01:09 pm => started 4/24/2015 7:01:13 pm => [] 4/24/2015 7:01:17 pm => [1,2] 4/24/2015 7:01:21 pm => [3] 4/24/2015 7:01:25 pm => [] 4/24/2015 7:01:29 pm => [] 4/24/2015 7:01:33 pm => [] 

anyone has idea on how this? in advance!

give go:

var source = observable.concat(observable.timer(timespan.fromseconds(6)).select(o => 1),                            observable.timer(timespan.fromseconds(1)).select(o => 2),                            observable.timer(timespan.fromseconds(4)).select(o => 3),                            observable.never<int>());  console.writeline("{0} => started", datetime.now); source     .groupbyuntil(x => 1, g => observable.timer(timespan.fromseconds(4)))     .select(x => x.toarray())     .switch()     .subscribe(i => console.writeline("{0} => [{1}]", datetime.now, string.join(",", i))); 

i had change test code duration third timer make sure value outside of grouped timer.


Comments

Popular posts from this blog

asp.net mvc - SSO between MVCForum and Umbraco7 -

Python Tkinter keyboard using bind -

ubuntu - Selenium Node Not Connecting to Hub, Not Opening Port -