Thursday, December 4, 2014

MSMQ reading and writing

MSMQ is a message queue implementation by Microsoft, which allows different applications to communicate with each other quite reliable. MSMQ supports sending objects and streams, does not require any additional software. However, it doesn't have embedded subscriber (or any bus management) implementation. We used MSMQ for logging, in order to reduce database load. So we needed a reliable way of reading from / writing to MSMQ. Let’s see how it can be done.

First of all you need to enable MSMQ. Manager can be accessed through "Computer Management" (compmgmt.msc) -> Expand "Services and Applications" -> Expand "Message Queuing"

In this article I’ll show the implementation of two helpers for reading and writing to MSMQ and in the second part I’ll show how to use them both for logging purposes.

After msmq set up it can initialize instance of existing queue:

var queue = new MessageQueue(queuePath, true);
or create a new one:
var queue = MessageQueue.Create(queuePath, true);

Notice that ‘true’ flag, it tells that queue should be transactional and I’ll explain below why it is very important.

MSMQ is pretty simple and it’s both blessing and a curse. There is not much you can do with the messages: basically you can receive them all, receive specific message by id and peek message. Receiving by id will remove message from queue. If exception happens after message have been received that message is pretty much lost. There is a way to avoid that by using transactional queue and wrap receiving messages in transaction.

So reading messages from queue can be implemented like this:

// ...
Func handler = /* Message handler: for example parsing message and writing into file. */;
// get all messages (this method does not remove them from queue)
var messageCollection = queue.GetAllMessages();
foreach (var item in messageCollection)
{
    // can't do much here, so just receive the message
    if (!queue.Transactional)
    {
        var message = queue.ReceiveById(item.Id);
        handler(message);
    }
    else
    {
        using (var transaction = new MessageQueueTransaction())
        {
            transaction.Begin();

            // this will be removed from queue after we commit the transaction
            var message = queue.ReceiveById(item.Id, transaction);

            bool succeed = true;
            try
            {
                succeed = handler(message);
            }
            catch
            {
                succeed = false;
            }
            // if message was handled successfully we can remove it from queue by committing the transaction
            if (succeed)
            {
                transaction.Commit();
            }
            else
            {
                // If handling message failed and message lifetime is not specified - abort the transaction
                // In this case message will stay in queue
                if (!messageLifePeriod.HasValue)
                {
                    transaction.Abort();
                }
                else
                {
                    // However, if handling message failed, but we specified message lifetime period, 
                    // we want to remove expired message from queue by committing transaction
                    try
                    {
                        if (DateTime.Now - item.ArrivedTime >= messageLifePeriod.Value)
                        {
                            transaction.Commit();
                        }
                    }
                    catch
                    {
                        transaction.Abort();
                    }
                }
            }
        }
    }
}

That was transactional queue reading. I stripped this sample from some code so it would be easier to read.

Writing to msmq will look something like this:

try
{
    var msg = new Message(message, /* message formatter */);

    if (this.MessageQueue.Transactional)
    {
        using (var transaction = new MessageQueueTransaction())
        {
            try
            {
                transaction.Begin();
                this.MessageQueue.Send(msg, transaction);
                transaction.Commit();
            }
            catch (Exception)
            {
                transaction.Abort();
                throw;
            }
        }
    }
    else
    {
        this.MessageQueue.Send(msg);
    }
}
catch (Exception)
{
    // do whatever
}

Notice the /* message formatter */ comment. Both reader and writer use message formatter to serialize/deserialize message. In this example I use XmlMessageFormatter by default.

And here is the source code to play with.

No comments :

Post a Comment