c# - TPL Dataflow, BroadcastBlock to BatchBlocks -
i have problem connecting broadcastblock(s)
batchblocks
. scenario sources broadcastblocks
, , recipients batchblocks
.
in simplified code below, 1 of supplemental action blocks executes. set batchsize each batchblock
1 illustrate problem.
setting greedy "true" make 2 actionblocks
execute, that's not want cause batchblock
proceed if it's not complete yet. ideas?
class program { static void main(string[] args) { // possible sources broadcastblocks. more var source1 = new broadcastblock<int>(z => z); // batch 1 // can many potential sources, 1 // want sources arrive first before proceeding var batch1 = new batchblock<int>(1, new groupingdataflowblockoptions() { greedy = false }); var batch1action = new actionblock<int[]>(arr => { // not run console.writeline("received batch 1 block!"); foreach (var item in arr) { console.writeline("received {0}", item); } }); batch1.linkto(batch1action, new dataflowlinkoptions() { propagatecompletion = true }); // batch 2 // can many potential sources, 1 // want sources arrive first before proceeding var batch2 = new batchblock<int>(1, new groupingdataflowblockoptions() { greedy = false }); var batch2action = new actionblock<int[]>(arr => { // not run console.writeline("received batch 2 block!"); foreach (var item in arr) { console.writeline("received {0}", item); } }); batch2.linkto(batch2action, new dataflowlinkoptions() { propagatecompletion = true }); // connect source(s) source1.linkto(batch1, new dataflowlinkoptions() { propagatecompletion = true }); source1.linkto(batch2, new dataflowlinkoptions() { propagatecompletion = true }); // fire source1.sendasync(3); task.waitall(new task[] { batch1action.completion, batch2action.completion }); ; console.readline(); } }
you have wrong understanding greedy
flag does. if equal true
, batch blocks gathers data if there no sufficient amount of data gather batch. settings greedy = false
, tpl dataflow
: i post batch blocks, not you
, batch block may or may not decide message broadcast block.
more over, block thread calling task.waitall(new task[] { batch1action.completion, batch2action.completion });
, block main thread and threads each of completion
tasks. may lead deadlock, threads blocked before able post messages across pipeline. also, not call source1.complete()
, waitall
call will never return.
what need set greedy
true
(which default), set batch size needed value (for example, 2
), call complete()
method, , do not use thread-blocking methods pipeline. doing this, batch blocks data broadcast, further blocks wouldn't data before data batch:
var source1 = new broadcastblock<int>(z => z); var options = new dataflowlinkoptions { propagatecompletion = true }; // block wouldn't execute, doesn't data greedy execution var batch1 = new batchblock<int>(2, new groupingdataflowblockoptions { greedy = false }); var batch1action = new actionblock<int[]>(arr => { console.writeline("received batch 1 block!"); foreach (var item in arr) { console.writeline("received {0}", item); } }); batch1.linkto(batch1action, options); // batch freedy, execute var batch2 = new batchblock<int>(2); var batch2action = new actionblock<int[]>(arr => { console.writeline("received batch 2 block!"); foreach (var item in arr) { console.writeline("received {0}", item); } }); batch2.linkto(batch2action, options); // connect source(s) source1.linkto(batch1, options); source1.linkto(batch2, options); // fire source1.sendasync(3); // simulate on work thread.sleep(3000); // complete batch, actionblock2 execute source1.sendasync(3); // if need wait completion, call method source1.complete(); // note whenall isn't blocking task var alltasks = task.whenall(batch1action.completion, batch2action.completion); // non-blocking wait await alltasks; // blocking wait alltasks.wait(); console.readline();
Comments
Post a Comment