c# - TPL Dataflow, BroadcastBlock to BatchBlocks -


i have problem connecting broadcastblock(s) batchblocks. scenario sources broadcastblocks, , recipients batchblocks.

in simplified code below, 1 of supplemental action blocks executes. set batchsize each batchblock 1 illustrate problem.

setting greedy "true" make 2 actionblocks execute, that's not want cause batchblock proceed if it's not complete yet. ideas?

class program {     static void main(string[] args)     {         // possible sources broadcastblocks. more         var source1 = new broadcastblock<int>(z => z);          // batch 1         // can many potential sources, 1         // want sources arrive first before proceeding         var batch1 = new batchblock<int>(1, new groupingdataflowblockoptions() { greedy = false });          var batch1action = new actionblock<int[]>(arr =>         {             // not run             console.writeline("received batch 1 block!");             foreach (var item in arr)             {                 console.writeline("received {0}", item);             }         });          batch1.linkto(batch1action, new dataflowlinkoptions() { propagatecompletion = true });          // batch 2         // can many potential sources, 1         // want sources arrive first before proceeding         var batch2 = new batchblock<int>(1, new groupingdataflowblockoptions() { greedy = false  });         var batch2action = new actionblock<int[]>(arr =>         {             // not run             console.writeline("received batch 2 block!");             foreach (var item in arr)             {                 console.writeline("received {0}", item);             }         });         batch2.linkto(batch2action, new dataflowlinkoptions() { propagatecompletion = true });          // connect source(s)         source1.linkto(batch1, new dataflowlinkoptions() { propagatecompletion = true });         source1.linkto(batch2, new dataflowlinkoptions() { propagatecompletion = true });          // fire         source1.sendasync(3);          task.waitall(new task[] { batch1action.completion, batch2action.completion }); ;          console.readline();     } } 

you have wrong understanding greedy flag does. if equal true, batch blocks gathers data if there no sufficient amount of data gather batch. settings greedy = false, tpl dataflow: i post batch blocks, not you, batch block may or may not decide message broadcast block.

more over, block thread calling task.waitall(new task[] { batch1action.completion, batch2action.completion });, block main thread and threads each of completion tasks. may lead deadlock, threads blocked before able post messages across pipeline. also, not call source1.complete(), waitall call will never return.

what need set greedy true (which default), set batch size needed value (for example, 2), call complete() method, , do not use thread-blocking methods pipeline. doing this, batch blocks data broadcast, further blocks wouldn't data before data batch:

var source1 = new broadcastblock<int>(z => z); var options = new dataflowlinkoptions { propagatecompletion = true };  // block wouldn't execute, doesn't data greedy execution var batch1 = new batchblock<int>(2, new groupingdataflowblockoptions { greedy = false }); var batch1action = new actionblock<int[]>(arr => {     console.writeline("received batch 1 block!");     foreach (var item in arr)     {         console.writeline("received {0}", item);     }  }); batch1.linkto(batch1action, options);  // batch freedy, execute var batch2 = new batchblock<int>(2); var batch2action = new actionblock<int[]>(arr => {     console.writeline("received batch 2 block!");     foreach (var item in arr)     {         console.writeline("received {0}", item);     } }); batch2.linkto(batch2action, options);  // connect source(s) source1.linkto(batch1, options); source1.linkto(batch2, options);  // fire source1.sendasync(3); // simulate on work thread.sleep(3000); // complete batch, actionblock2 execute source1.sendasync(3);  // if need wait completion, call method source1.complete(); // note whenall isn't blocking task var alltasks = task.whenall(batch1action.completion, batch2action.completion); // non-blocking wait await alltasks; // blocking wait alltasks.wait();  console.readline(); 

Comments

Popular posts from this blog

asp.net mvc - SSO between MVCForum and Umbraco7 -

Python Tkinter keyboard using bind -

ubuntu - Selenium Node Not Connecting to Hub, Not Opening Port -