c# - RabbitMQ returns same message again and again -


i try unit test rather basic scenario - worker queue 2 workers , 1 publisher scenario, keeps returning same message on , on again queue.

the following code in test puts 1 100 messages queue , 2 consumers eat them up. problem keep getting message 1 , 2. tried separate acknowledgement method, since in application takes time message process (commented method confirm) - threw exception token unknown:

the amqp operation interrupted: amqp close-reason, initiated peer, code=406, text="precondition_failed - unknown delivery tag 1", classid=60, methodid=80, cause=

it seems acknowledgement broken somehow. tried switch off - no luck either.

class:

using system; using system.text; using newtonsoft.json; using rabbitmq.client; using rabbitmq.client.events;  namespace backend.mq.ocr {     public class batchqueue : queuebase<batchmessage>     {         private readonly imodel _channel;         private const string qprefix = "ocrbatches_";         private readonly queueingbasicconsumer _consumer;         private ulong _latesttoken = ulong.maxvalue;         private readonly string _jobid;         public batchqueue(string connectionstring, string jobid):             base(connectionstring)         {             _jobid = jobid;             var factory = new connectionfactory()             {                 hostname = connectionstring             };             var connection = factory.createconnection();             _channel = connection.createmodel();             _channel.queuedeclare(name, true, false, false, null);             //binding consumers             _channel.basicqos(0, 1, false);             _consumer = new queueingbasicconsumer(_channel);             _channel.basicconsume(name, false, _consumer);         }          public override void publish(batchmessage msg)         {             var message = jsonconvert.serializeobject(msg);             var body = encoding.utf8.getbytes(message);             var properties = _channel.createbasicproperties();             properties.setpersistent(true);             _channel.basicpublish("", name, properties, body); #if debug             system.diagnostics.trace.writeline("[x] sent task:" + msg); #endif          }          private string name         {             { return qprefix + _jobid; }         }           public override batchmessage receive()         {             var ea =                     (basicdelivereventargs)_consumer.queue.dequeue();              var body = ea.body;             _channel.basicack(ea.deliverytag, false);             return jsonconvert.deserializeobject<batchmessage>(encoding.utf8.getstring(body));         }           public override void confirm()         {             //if (_latesttoken < ulong.maxvalue) _channel.basicack(_latesttoken, false);         }     } } 

unit tests:

#if nunit using testclass = nunit.framework.testfixtureattribute; using testmethod = nunit.framework.testattribute; using testcleanup = nunit.framework.teardownattribute; using testinitialize = nunit.framework.setupattribute; using classcleanup = nunit.framework.testfixtureteardownattribute; using classinitialize = nunit.framework.testfixturesetupattribute; #else #endif using system.threading.tasks; using system.threading; using system; using system.collections.generic; using backend.mq.ocr; using microsoft.visualstudio.testtools.unittesting; #if nunit using massert = nunit.framework.assert; #else using massert = microsoft.visualstudio.testtools.unittesting.assert; #endif  namespace mq.test {     [testclass]     public class batchqueuetest     {         [testmethod]         public void concurrencytest()         {             var batchname = guid.newguid().tostring();             var queue = new batchqueue("localhost", batchname);             var tasks = new list<task>();             var counter = 0;             (int = 0; < 100; i++)             {                 queue.publish(new batchmessage()                 {                     files = new list<string>() { i.tostring() }                 });             }             (int = 0; < 2; i++)             {                 var task = task.factory.startnew(() =>                 {                     var q = new batchqueue("localhost", batchname);                     var res = q.receive();                     while (res != null)                     {                         system.diagnostics.trace.writeline(res.files[0]);                         q.confirm();                         interlocked.increment(ref counter);                     }                 });                 tasks.add(task);             }             var ok = task.waitall(tasks.toarray(), timespan.fromseconds(30));             massert.istrue(ok, "tasks didnt complete in time");             massert.areequal(counter, 100, "not messages have been processed");          }     } } 

your unit test starts 2 tasks. before while loop receive message keep confirming same message inside while loop:

var q = new batchqueue("localhost", batchname); //receive message 1 or 2 var res = q.receive(); while (res != null) {   //infinite loop     system.diagnostics.trace.writeline(res.files[0]);     q.confirm();     interlocked.increment(ref counter); } 

try put var res = q.receive(); inside loop


Comments

Popular posts from this blog

asp.net mvc - SSO between MVCForum and Umbraco7 -

Python Tkinter keyboard using bind -

ubuntu - Selenium Node Not Connecting to Hub, Not Opening Port -