RabbitMQ MassTransit Kullanarak Producer Consumer Yapısı

Daha önceki RabbitMQ Nedir yazımızda genel hatlarıyla rabbitmq dan bahsedip windows makina üzerinde kurulumunu anlatıp kısaca; erişilebilirliği, güvenilirliği yüksek ve ölçeklenebilen loosly-coupled asenkron iletişim sağlayan uygulamalar geliştirmektir diye tanımlamıştık.

Bu yazımızda ise Masstransit kullanarak bir Producer Consumer projesi oluşturacağız ancak makinamızda rabbitmq kurulu çalışır vaziyette olduğundan emin olalım.

Enterprise Service Bus (ESB) Nedir ?

Enterprise service bus diğer ismiyle message broker; rabbitmq gibi messaging katmanlarının üzerinde bulunan ve distributed synchronous yada asynchronous communication sağlayabilen bir middleware' dir. Bize birbirinden tamamen farklı olabilen uygulamalar arasında message-based communication yapabilmemizi sağlayan bir transport gateway'i dir. Diğer bir deyişle; message'ları provider ve consumer arasında transport eden bir çeşit middleware tool'u dur. 

 

 

Şimdi ise masstransit kullanarak bir örnek üzerinden ilerleyelim. Örneğimiz şu şekilde olsun; günlük şirket bültenini tüm çalışanlara email olarak gönderen bir producer/consumer yapısı tasarlayalım.

VS da sırasıyla 3 proje oluşturacağız.

  1. RMQMessage(class lib.)
  2. RMQProducer(console app.)
  3. RMQConsumer(console app.)

1-) RMQMessage

İlk projemiz olan RMQMessage ile başlayalım. Vs da RMQMessage adından bir class-library oluşturalım. Masstransit'in message-based communication sağladığını söylemiştik ve hem producer hemde consumer tarafından kullanılacak bu message yada contract diyede adlandırdığımız model ve onun sub-model'lerini oluşturacağız. 

İlk olarak abstract olan BaseMessage.cs'i oluşturalım ve bu model içerisinde ortak olmasını istediğimiz property'ler yer alsın.

    public abstract class BaseMessage
    {
        public DateTime PublishedTime { get; set; }
        public DateTime ConsumedTime { get; set; }
        public Guid QueueId { get; set; }
    }

Sonrasında yukarıda bahsettiğimiz producer ve consumer'ın kullanacağı NewsletterMailMessage adında contract'ımızı aşağıdaki oluşturalım.

    public class NewsletterMailMessage: BaseMessage
    {
        public List<string> AddressList { get; set; }
        public NewsletterModel NewsLetter { get; set; }
    }
    public class NewsletterModel
    {
        public string MailSubject{ get; set; }
        public string HtmlContent { get; set; }
    }

Yukarıda modellerimizi oluşturduk ve RMQMessage projesindeki işimiz bitti. Şimdi sırada 2. projeyi oluşturmak var.

2-) RMQProducer

Bunun için aynı solution'da RMQProducer adında bir console app oluşturalım. Bu proje queue'ya ilgili message'ı publish etmeden sorumlu olacak. Projeyi oluşturduktan sonra nuget üzerinden package manager console kullanarak aşağıdaki masstransit.rabbitmq paketini yükleyelim.

Masstransit.RabbitMQ

 Install-Package MassTransit.RabbitMQ

Bu paket ile birlikte rabbitmq için kullanılan masstransit kütüphanesi onun dependency'leri projemize kurulmuş olacak.

Kurulum tamamlandıktan sonra RMQMessage projesini RMQProducer projesine reference olarak ekleyelim. Ekleme nedenimiz NewsletterMailMessage.cs modelini kullanabilmek.

Program.cs içerisine aşağıdaki gibi InitializeBus adında bir metot tanımlayalım ve bu metot IBusControl arayüzünü initialize etmekten sorumlu olsun.

    static IBusControl InitializeBus()
    {
        return Bus.Factory.CreateUsingRabbitMq(cfg =>
         {
             cfg.Host(new Uri("rabbitmq://localhost/"), h =>
             {
                 h.Username("guest");
                 h.Password("guest");
             });
         });
    }

local makinada kurulu olan rabbitmq url'i rabbitmq://localhost/ dir ve default username password ise guest olarak tanımlanmıştır.

RabbitMQ ya bağlantı kısmını halletik şimdi ise message objemizi initialize edelim.

    static NewsletterMailMessage InitializeMessage()
    {
        var message = new NewsletterMailMessage
        {
            AddressList = _customerService.GetAllMailAddresses(), //assume more than 2000
            NewsLetter = new NewsletterModel
            {
                MailSubject = "Daily Newsletter of Contoso Corp.",
                HtmlContent = "Lorem ipsum dolor sit amet, et nam mucius docendi hendrerit, an usu decore mandamus. Ei qui quod decore, cum nulla nostrud erroribus ut, est eu aperiri interesset. Legere mentitum per an. Hinc legimus nostrum cu vix."
            },
            PublishedTime = DateTime.Now,
            QueueId = Guid.NewGuid()
        };

        Console.WriteLine("Message published !\nSubject : " + message.NewsLetter.MailSubject + "\nPublished at : " + message.PublishedTime + "\nQueueId : " + message.QueueId);
        return message;
    }

Son adım olarak main fonksiyonu içerisinde bu iki metodu call ederek message'ı queue'ya push edip Producer projemizdeki işlemleri bitirelim.

   static void Main(string[] args)
   {
       var busControl = InitializeBus();
       busControl.Start();
       Console.WriteLine("Started publishing.");

       var message = InitializeMessage();

       busControl.Publish(message);
       Console.ReadLine();
   }

 

3-) RMQConsumer

Sırada son adım olan consumer projesini oluşturma var. Consumer Producer'ın gönderdiği message'ları dinleyerek ilgili queue için consume etmeden sorumlu. Projeyi oluşturduktan sonra nuget üzerinden package manager console kullanarak aşağıdaki masstransit.rabbitmq paketini yükleyelim.

Masstransit.RabbitMQ

 Install-Package MassTransit.RabbitMQ

Bu paket ile birlikte rabbitmq için kullanılan masstransit kütüphanesi onun dependency'leri projemize kurulmuş olacak.

Kurulum tamamlandıktan sonra RMQMessage projesini RMQConsumer projesine reference olarak ekleyelim. Ekleme nedenimiz NewsletterMailMessage.cs modelini kullanabilmek.

İlk olarak rabbitmq ile connection sağlama ve hangi queue kullanılacak gibi konfigurasyonları Pragram.cs içerisine tanımlayalım.

    static void Main(string[] args)
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
            {
                h.Username("guest");
                h.Password("guest");
            });

            cfg.ReceiveEndpoint(host, "DailyNewsletterMail", e =>
            e.Consumer<NewsletterMailMessageConsumer>());
        });

        busControl.Start();
        Console.WriteLine("Started consuming.");
        Console.ReadLine();
    }

Yukarıdaki kod bloğu şunu söylüyor; eğer rabbitmq local makinamızda kurulu ise adresi rabbitmq://localhost/ şeklindedir ve default userName-password guest olarak gelmektedir. Queue ismi olarak DailyNewsletterMail verdik ve bu queue için consume edilecek olan message da NewsletterMailMessageConsumer şeklinde belirttik.

Şimdi ise NewsletterMailMessageConsumer.cs adında message objemizi consume edecek olan kısmı geliştirelim.

public class NewsletterMailMessageConsumer : IConsumer<NewsletterMailMessage>
{
    public Task Consume(ConsumeContext<NewsletterMailMessage> context)
    {
        var message = context.Message;
        message.ConsumedTime = DateTime.Now;

        Console.WriteLine("Message consumed !\nSubject : " + message.NewsLetter.MailSubject + "\nConsumed at: " + message.ConsumedTime + "\nQueueId : " + message.QueueId);

        //todo send mail impr.

        return context.CompleteTask;
    }
}

Yukarıdaki kod blou şunu anlatıyor; NewsletterMailMessageConsumer adında IConsumer interface'inin implement eden ve bu implementasyon sonucunda Consume metoduna sahip olan bir consumer'ı mız var. Bu consumer DailyNewsletterMail queue'suna gelen NewsletterMailMessage modelini consume eder.

Sırasıyla Consumer'ı ve Producer'ı run ettiğimizde ilk olarak rmq management-console da DailyNewsletterMail queue'su aşağıdaki gibi listelenecektir.

 Producer ve Consumer projelerinin ayrı ayrı console çıktıları ise aşağıdaki gibi olacaktır.

Aynı QueueId ye ait message'ımız Producer dan çıkıp consumer'a ulaştığı bilgisini yukarıdaki gibi görüyoruz.

Queue yapıları özellikle async mesajlaşma gerektiren yerlerde oldukça önemlidir ve hayat kurtarır. Bu yazıda service bus'lar dan Masstransit kullanarak basit bir consume/producer uygulaması geliştirdik ve daha sonraki yazılarımızda diğer queue yapıları ile ilgilide konuları ele almaya devam edeceğiz.

RabbitMQ Nedir ? Windows Üzerinde Kurulumu

Messaging Queue (MQ), fire-and-forget communication dediğimiz asynchronous çalışma yapısı üzerine kurulmuş yapılar için günümüz yazılım dünyasının en popüler yapısıdır. Bu yapılara örnek olarak; JMS, MSMQ, RabbitMQ, Kafka etc. verebiliriz ve  genel çerçeveden baktığımızda messaging queue'ler bir sender-receiver şeklinde çalışırlar.

RabbitMQ Nedir

RabbitMQ, Erlang dili ile open-source olarak geliştirilen ve Open Telecom Platform kütüphanesi üzerinde build edilebilen günümüz server-to-server/app-to-app communication ihtiyaçları konusunda giderek popüler olan hızlı bir messaging queue yapısıdır. Advanced Message Queuing Protocol (AMQP) implement ederek uygulamalar ve server'lar arası veri alışverişini sağlar.

Rabbitmq Publisher ve Consumer mantığıyla çalışır. Örneğin data-exchange yapmanız gereken bir iş var bunu rabbitmq üzerinden publisher'ile ilgili queue'ya publish edip sonrasında bu queue'yu consume edecek bir consumer projesi oluşturup yapmak istediğimiz bu işlemi consumer'a yaptırabiliriz. Bu işlemler genelde ana uygulama üzerinde yapmak istemeyeceğimiz yükü fazla olan işlemler olabilir.

Daha basit anlatacak olursak; sunucu RabbitMQ sunucusuna bir message gönderir ve sunucu bu mesajı ilgili queue'ya yönlendirir. Sonrasında başka bir uygulama bu queue'yu dinler ve FIFO mantığıyla çalışan kuyruktaki bu mesajları consume ederek süreci sonlandırır. Sahip olduğu Web Management Interface ile de bulunan queue'ları görüntüleyebilme, requeue etme, delete, purge gibi daha bir çok işlemi yapabilmemizi sağlar.

Queue yapıları aslında projelerimiz için birer middleware görevi görmektedir. Ana uygulamanızdan queue ya push ettiğiniz message Consumer down olsa dahi o message'ı queue'da bekletir ve consumer tekrardan start olduğunda o message'ı tekrar tekrar push etmeye çalışır ve böylelikle veri kaybının da önüne geçmemizi sağlar. 

Bazı term'lere bakacak olursak;

Producer/Publisher : Queue'ya message'ı gönderen yapıya verilen isimdir.
Consumer : Queue'yu dinleyerek ilgili message'ları receive eden yapıdır.                                                                                                                                               Queue : First-in-first-out mantığıyla çalışan kuyruk yapımız.
Exchange : Routing yani kuyruğa iletilen message'ı route eden yapıdır ve routing işlemini yapan çeşitli yapılar bulunmaktadır.

 

Kurulum

İlk olarak rabbitmq erlang dili ile geliştirildiğinden makinamızda sahip olduğumuz işletim sistemine göre uygun versiyona ait erlang dosyalarını erlang.org sitesinden indirip kuralım.

 

Sonrasında ise rabbitmq.com sitesinden Windows için güncel rabbitmq server versiyonunu indirip kuralım.

Kurulumlar sorunsuz bir şekilde tamamlandıktan sonra yukarıda da bahsettiğimiz web arayüzünü aktifleştirelim. Bunun için rabbimq sbin dosyası içerisinde olan RabbitMQ Command Prompt'ı çalıştıralım ve aşağıdaki komut satırını yazalım.

> rabbitmq-plugins enable rabbitmq_management

RabbitMQ windows makinada Windows Service olarak çalışır ve yukarıda yazdığımız web arayüzünü enable etme komutunun hemen çalışabilmesi için rabbitmq'yu aşağıdaki gibi stop/start edelim.

> net service stop RabbitMQ
.....
> net service start RabbitMQ

Şimdi ise browser üzerinden http://localhost:15672/ adresine giderek login için default credential'lar username/password guest/guest olarak girdikten sonra aşağıdaki gibi arayüzü göreceğiz.

Şuan mevcutta herhangi bir queue oluşturmadığımızdan üstte bulunan exchange tab'ına tıkladığımızda aşağıdaki gibi default exchange listesini görüntüleyebiliriz.

RabbitMQ Windows Üzerinden kurulumu bu kadardı.

Sonraki yazılarda Exchange tool'larındanbirini kullanarak basit bir publisher consumer örneği ile devam edeceğiz.