c# - RabbitMQ returns same message again and again -
i try unit test rather basic scenario - worker queue 2 workers , 1 publisher scenario, keeps returning same message on , on again queue.
the following code in test puts 1 100 messages queue , 2 consumers eat them up. problem keep getting message 1 , 2. tried separate acknowledgement method, since in application takes time message process (commented method confirm) - threw exception token unknown:
the amqp operation interrupted: amqp close-reason, initiated peer, code=406, text="precondition_failed - unknown delivery tag 1", classid=60, methodid=80, cause=
it seems acknowledgement broken somehow. tried switch off - no luck either.
class:
using system; using system.text; using newtonsoft.json; using rabbitmq.client; using rabbitmq.client.events; namespace backend.mq.ocr { public class batchqueue : queuebase<batchmessage> { private readonly imodel _channel; private const string qprefix = "ocrbatches_"; private readonly queueingbasicconsumer _consumer; private ulong _latesttoken = ulong.maxvalue; private readonly string _jobid; public batchqueue(string connectionstring, string jobid): base(connectionstring) { _jobid = jobid; var factory = new connectionfactory() { hostname = connectionstring }; var connection = factory.createconnection(); _channel = connection.createmodel(); _channel.queuedeclare(name, true, false, false, null); //binding consumers _channel.basicqos(0, 1, false); _consumer = new queueingbasicconsumer(_channel); _channel.basicconsume(name, false, _consumer); } public override void publish(batchmessage msg) { var message = jsonconvert.serializeobject(msg); var body = encoding.utf8.getbytes(message); var properties = _channel.createbasicproperties(); properties.setpersistent(true); _channel.basicpublish("", name, properties, body); #if debug system.diagnostics.trace.writeline("[x] sent task:" + msg); #endif } private string name { { return qprefix + _jobid; } } public override batchmessage receive() { var ea = (basicdelivereventargs)_consumer.queue.dequeue(); var body = ea.body; _channel.basicack(ea.deliverytag, false); return jsonconvert.deserializeobject<batchmessage>(encoding.utf8.getstring(body)); } public override void confirm() { //if (_latesttoken < ulong.maxvalue) _channel.basicack(_latesttoken, false); } } }
unit tests:
#if nunit using testclass = nunit.framework.testfixtureattribute; using testmethod = nunit.framework.testattribute; using testcleanup = nunit.framework.teardownattribute; using testinitialize = nunit.framework.setupattribute; using classcleanup = nunit.framework.testfixtureteardownattribute; using classinitialize = nunit.framework.testfixturesetupattribute; #else #endif using system.threading.tasks; using system.threading; using system; using system.collections.generic; using backend.mq.ocr; using microsoft.visualstudio.testtools.unittesting; #if nunit using massert = nunit.framework.assert; #else using massert = microsoft.visualstudio.testtools.unittesting.assert; #endif namespace mq.test { [testclass] public class batchqueuetest { [testmethod] public void concurrencytest() { var batchname = guid.newguid().tostring(); var queue = new batchqueue("localhost", batchname); var tasks = new list<task>(); var counter = 0; (int = 0; < 100; i++) { queue.publish(new batchmessage() { files = new list<string>() { i.tostring() } }); } (int = 0; < 2; i++) { var task = task.factory.startnew(() => { var q = new batchqueue("localhost", batchname); var res = q.receive(); while (res != null) { system.diagnostics.trace.writeline(res.files[0]); q.confirm(); interlocked.increment(ref counter); } }); tasks.add(task); } var ok = task.waitall(tasks.toarray(), timespan.fromseconds(30)); massert.istrue(ok, "tasks didnt complete in time"); massert.areequal(counter, 100, "not messages have been processed"); } } }
your unit test starts 2 tasks. before while loop receive message keep confirming same message inside while loop:
var q = new batchqueue("localhost", batchname); //receive message 1 or 2 var res = q.receive(); while (res != null) { //infinite loop system.diagnostics.trace.writeline(res.files[0]); q.confirm(); interlocked.increment(ref counter); }
try put var res = q.receive();
inside loop
Comments
Post a Comment