Caner Tosuner

Leave your code better than you found it

.Net Core Kafka Kurulum ve Producer Consumer Kullanımı

Daha önceki fire-and-forget yapılarını incelerken rabbitmq üzerinde masstransit kullanarak anlatıp örnek projeler ile incelemiştik. Bu yazımızda ise .Net Core uygulamarında apache kafka kullanımına değineceğiz. 

Messaging queue yapıları ana uygulamanızın yükünü azaltmak ve microservice mimarisinin fire-and-forget yapılarının en yaygın çözümlerinden biri olarak yazılım geliştirme hayatımızda yer edinmekte. Apache Kafka ise bu yapılardan biri olarak open source geliştirilen distributed, scalable ve high-performance sunabilen bir publish-and-subscribe message broker dır. High volumes of data yani oldukça yüksek hacimli verileri işleyebilmek adına kullanabileceğimiz teknolojilerin başında gelmektedir.

Architecture

Apache Kafka'nın mimarisine ve terminolojide geçen terimlere bakacak olursak;

Kafka bir veya birden fazla sunucu üzerinde bir cluster oluşturarak çalışır ve kafka üzerindeki her bir record key-value ve timestamp bilgileri kullanılarak topic olarak adlandırılan kategoriler içerisinde store edilir.

Kafka basic olarak aşağıdaki 4 ana başlıktan oluşur;

  • Cluster : broker olarak adlandırılan bir veya birden fazla server'ların yer aldığı collection.
  • Producer : message'ları publish eden yani kafka'ya message üreten yapının/uygulamanın adı.
  • Consumer : publish edilmiş message'ları retrieve/consume eden uygulama.
  • Zookeper : distributed olarak multiple instance çalışan uygulamaları koordine etmede kullanılan bir uygulamadır.

Yukarıda da bahsettiğimiz gibi kafka'da her bir data => message olarak adlandırılır. Kafka her bir mesajı byte array'ler şeklinde key-value olarak timestamp bilgisi ile saklar. Her bir kafka server'ı broker olarak adlandırılır.  Producer-consumer ve cluster'lar arası iletişim TCP protokolü ile kurulur ve cluester'a yeni broker'lar ekleyerek kafka'yı horizontal olarak scale edebiliriz.

 

Producer ilgili message'ları kafkaya push eder ve kafka mesajları partition dediğimiz sıralı mesaj dizinleri olarak dinamik bir şekilde daha önceden kendisine subscribe olmuş consumer'lar tarafından alınmak üzere hazırlar ve sırası geldiğinde consume edilir.

Installlations

Surce code bölümünde docker-compose dosyasını run ederekte kurulumları yapabilirsiniz ancak biz local makinada teker teker manuel olarak kurulumları yapalım. Kafka'nın java ve zookeper dependency'leri bulunmakta ve bunun için ilk olarak makinamıza JRE8 ve Zookeper yüklememiz gerekmekte. 

  • JRE 8 Installation

İşletim sistemi versiyonunuza göre bu adresten JRE8'i indirip kuralım.

  • Zookeeper Installation

JRE'den sonra bu adresten zookeeper'ın son stable versiyonunu indirip kuralım.

Zookeeper indirdikten sonra C dizinine dosyaları çıkartalım "C:\zookeeper-3.4.13”. Daha sonra config klasörü içerisinde bulunan zoo_sample.cfg dosyasının ismini zoo.cfg olarak değiştirelim. Sonrasında bu dosyanın içine gidip dataDir'e settings'ini dataDir=/data olarak güncelleyelim.

Son olarak ise işletim sistemi system variable'larına hem JAVA_HOME'u hemde ZOOKEEPER_HOME'u tanımlamak var. Bunun için makinanızda system variable'larına aşağıdaki gibi JAVA_HOME ve ZOOKEEPER_HOME variable'larını tanımlayıp Path bölümüne de bunların path bilgilerini geçelim.

JAVA_HOME  => %JAVA_HOME%\bin

ZOOKEEPER_HOME = > %ZOOKEEPER_HOME%\bin

 Zookeper server'ı run etmek için command prompt'tan olarak zkserver yazmak yeterli.

 

  • Kafka Installation

Kafka için bu adresten kafkanın binary dosyalarını inderelim ve ilgili dosyaları C:/kafka dizinie exract edelim. Powershell yada cmd kullanarak kafka dizinine gidip şu komutu çalıştıralım;

 .\bin\windows\kafka-server-start.bat ./config/server.properties

bu komutla birlikte kafka çalışmaya başlayacaktır.

Bütün kurumlarımızı tamamladık şimdi sırada Producer ve Consumer uygulamalarını oluşturmak var.

Application

Yazının başında da söylediğimiz gibi bir .Net Core uyglaması üzerinde kafka kullanacağız ve örnek proje olarak, email göndermede kullanılan bir producer-consumer uygulaması geliştirelim. İlk olarak aşağıdaki gibi Kafka message'ını tanımlayalım. Bu message sınıfı hem consumer hemde producer tarafından kullanılacağından solution'da Kafka.Message adında bir .Net Core class-library projesi içerisinde tanımlı olsun.

1) Kafka.Message

public class EmailMessage:IMessageBase
{
    public string To { get; set; }
    public string Subject { get; set; }
    public string Content { get; set; }
}

Bu message sınıfına ait verileri producer tarafından kafka'da bulunan emailmessage-topic adındaki topic collection'ına bırakılacak. Yine aynı solution'da Kafka.Producer adında bir console application oluşturalım.

2) Kafka.Producer

Nuget üzerinde kafka client olarak kullanılabilecek belli başlı bazı kütüphaneler bulunmakta. .Net Core uyumluluğu açısından biz örnek projede Confluent.Kafka client'ını kullanacağız. Her ne kadar beta versiyonu olsada github-rating'leri bakımından oldukça beğenilen bir kütüphanedir.

Install-Package Confluent.Kafka -Version 1.0-beta

Producer'da belirtilen topic için kafka ya message push etmede kullanacağımız IMessageProducer interface ve implementasyonunu aşağıdaki gibi tanımlayalım ve kullanım olarakda Program.cs içerisinde Main func'da Produce metodunu call ederek EmailMessage'ını push edelim.

public interface IMessageProducer
{
    void Produce(string topic, IMessageBase message);
}

public class MessageProducer : IMessageProducer
{
    public void Produce(string topic, IMessageBase message)
    {
        var config = new ProducerConfig { BootstrapServers = "localhost:9092" };

        using (var producer = new Producer<Null, string>(config))
        {
            var textMessage = JsonConvert.SerializeObject(message);
           
            producer.BeginProduce(topic, new Message<Null, string> { Value = textMessage }, OnDelivery);

            // wait for up to 10 seconds for any inflight messages to be delivered.
            producer.Flush(TimeSpan.FromSeconds(10));
        }
    }

    private void OnDelivery(DeliveryReportResult<Null, string> r)
    {
        Console.WriteLine(!r.Error.IsError ? $"Delivered message to {r.TopicPartitionOffset}" : $"Delivery Error:{r.Error.Reason}");
    }
}

static void Main(string[] args)
{
    IMessageProducer messageProducer = new MessageProducer();

    //produce email message
    var emailMessage = new EmailMessage
    {
        Content = "Contoso Retail Daily News Email Content",
        Subject = "Contoso Retail Daily News",
        To = "all@contosoretail.com.tr"
    };
    messageProducer.Produce("emailmessage-topic", emailMessage);
    
    Console.ReadLine();
}

Dilerseniz topic oluşturma ve message produce işlemlerini command-prompt üzerinden de yapabilirsiniz, biz örnek projede için kafka client kullanarak topic oluşturduk.

3. Kafka.Consumer

Consumer projeside kafka da emailmessage-topic'ine push edilen message'ları consume edip ilgili business'ları process eden uygulamamız olacaktır. Bunun için solution'da Kafka.Consumer adında bir Console Application oluşturalım ve yine nuget üzerinden Confluent.Kafka kütüphanesini projemiz referanslarına ekleyelim.

Kurulumu tamamladıktan sonra consume işleminde kullanacağımız abstract MessageConsumerBase sınıfını aşağıdaki gibi tanımlayalım.

public abstract class MessageConsumerBase<IMessage>
{
    private readonly string _topic;

    protected MessageConsumerBase(string topic)
    {
        this._topic = topic;
    }

    public void StartConsuming()
    {
        var conf = new ConsumerConfig
        {
            GroupId = "emailmessage-consumer-group",
            BootstrapServers = "localhost:9092",
            AutoOffsetReset = AutoOffsetResetType.Earliest
        };

        using (var consumer = new Consumer<Ignore, string>(conf))
        {
            consumer.Subscribe(_topic);

            var keepConsuming = true;
            consumer.OnError += (_, e) => keepConsuming = !e.IsFatal;

            while (keepConsuming)
            {
                try
                {
                    var consumedTextMessage = consumer.Consume();
                    Console.WriteLine($"Consumed message '{consumedTextMessage.Value}' Topic: {consumedTextMessage.Topic}'.");

                    var message = JsonConvert.DeserializeObject<IMessage>(consumedTextMessage.Value);

                    OnMessageDelivered(message);
                }
                catch (ConsumeException e)
                {
                    OnErrorOccured(e.Error);
                }
            }

            // Ensure the consumer leaves the group cleanly and final offsets are committed.
            consumer.Close();
        }
    }

    public abstract void OnMessageDelivered(IMessage message);

    public abstract void OnErrorOccured(Error error);
}

Bu base sınıfı inherit almış EmailMessageConsumer sınıfı StartConsuming() metodunu call ederek consume etmeye başlamasını sağlayan kod bloğunu Program.cs içerisinde aşağıdaki gibi tanımlayalım.

public class EmailMessageConsumer : MessageConsumerBase<EmailMessage>
{
    public EmailMessageConsumer() : base("emailmessage-topic") { }

    public override void OnMessageDelivered(EmailMessage message)
    {
        Console.WriteLine($"To: {message.To} \nContent: {message.Content} \nSubject: {message.Subject}");

        //todo email send business logic
    }

    public override void OnErrorOccured(Error error)
    {
        Console.WriteLine($"Error: {error}");

        //todo onerror business
    }
}

static void Main(string[] args)
{
    Console.WriteLine("Consumer Started !");

    var emailMessageConsumer = new EmailMessageConsumer();
    emailMessageConsumer.StartConsuming();
    
    Console.ReadLine();
}

Örnek uygulama geliştirmemiz bitti. Önce producer ardında consumer projelerini sırasıyla run edip producer tarafından üretilen mesajın kafka üzerinden consumer tarafından consume edilip data-transfer'in sağlandığını görebilirsiniz.

Kafka günümüz itibariyle rakiplerine gore data-transmission'ı daha hızlı ve performanslı olması açısından özellikle real-time streaming uygulamalar için en iyi çözüm olarak kabul edilmekte. RabbitMQ, MSMQ, IBM MQ ve Kafka gibi messaging yapılarının arasından neden kafka diye sorduğumuzda; kafka özellikle huge-amount-of-data transfer söz konusu olduğunda (örnek olarak: IOT ve Chat yapıları ) sektör tarafından en iyi seçenek olarak kabul edilmekte. Eğer uygulamanız hızlı ve scalable bir message-broker'a ihtiyaç duyarsa kafka müthiş bir seçenek olacaktır.

Source Code

Yorum ekle

Loading