Caner Tosuner

Leave your code better than you found it

.Net Core Kafka Kurulum ve Producer Consumer Kullanımı

Daha önceki fire-and-forget yapılarını incelerken rabbitmq üzerinde masstransit kullanarak anlatıp örnek projeler ile incelemiştik. Bu yazımızda ise .Net Core uygulamarında apache kafka kullanımına değineceğiz. 

Messaging queue yapıları ana uygulamanızın yükünü azaltmak ve microservice mimarisinin fire-and-forget yapılarının en yaygın çözümlerinden biri olarak yazılım geliştirme hayatımızda yer edinmekte. Apache Kafka ise bu yapılardan biri olarak open source geliştirilen distributed, scalable ve high-performance sunabilen bir publish-and-subscribe message broker dır. High volumes of data yani oldukça yüksek hacimli verileri işleyebilmek adına kullanabileceğimiz teknolojilerin başında gelmektedir.

Architecture

Apache Kafka'nın mimarisine ve terminolojide geçen terimlere bakacak olursak;

Kafka bir veya birden fazla sunucu üzerinde bir cluster oluşturarak çalışır ve kafka üzerindeki her bir record key-value ve timestamp bilgileri kullanılarak topic olarak adlandırılan kategoriler içerisinde store edilir.

Kafka basic olarak aşağıdaki 4 ana başlıktan oluşur;

  • Cluster : broker olarak adlandırılan bir veya birden fazla server'ların yer aldığı collection.
  • Producer : message'ları publish eden yani kafka'ya message üreten yapının/uygulamanın adı.
  • Consumer : publish edilmiş message'ları retrieve/consume eden uygulama.
  • Zookeper : distributed olarak multiple instance çalışan uygulamaları koordine etmede kullanılan bir uygulamadır.

Yukarıda da bahsettiğimiz gibi kafka'da her bir data => message olarak adlandırılır. Kafka her bir mesajı byte array'ler şeklinde key-value olarak timestamp bilgisi ile saklar. Her bir kafka server'ı broker olarak adlandırılır.  Producer-consumer ve cluster'lar arası iletişim TCP protokolü ile kurulur ve cluester'a yeni broker'lar ekleyerek kafka'yı horizontal olarak scale edebiliriz.

 

Producer ilgili message'ları kafkaya push eder ve kafka mesajları partition dediğimiz sıralı mesaj dizinleri olarak dinamik bir şekilde daha önceden kendisine subscribe olmuş consumer'lar tarafından alınmak üzere hazırlar ve sırası geldiğinde consume edilir.

Installlations

Surce code bölümünde docker-compose dosyasını run ederekte kurulumları yapabilirsiniz ancak biz local makinada teker teker manuel olarak kurulumları yapalım. Kafka'nın java ve zookeper dependency'leri bulunmakta ve bunun için ilk olarak makinamıza JRE8 ve Zookeper yüklememiz gerekmekte. 

  • JRE 8 Installation

İşletim sistemi versiyonunuza göre bu adresten JRE8'i indirip kuralım.

  • Zookeeper Installation

JRE'den sonra bu adresten zookeeper'ın son stable versiyonunu indirip kuralım.

Zookeeper indirdikten sonra C dizinine dosyaları çıkartalım "C:\zookeeper-3.4.13”. Daha sonra config klasörü içerisinde bulunan zoo_sample.cfg dosyasının ismini zoo.cfg olarak değiştirelim. Sonrasında bu dosyanın içine gidip dataDir'e settings'ini dataDir=/data olarak güncelleyelim.

Son olarak ise işletim sistemi system variable'larına hem JAVA_HOME'u hemde ZOOKEEPER_HOME'u tanımlamak var. Bunun için makinanızda system variable'larına aşağıdaki gibi JAVA_HOME ve ZOOKEEPER_HOME variable'larını tanımlayıp Path bölümüne de bunların path bilgilerini geçelim.

JAVA_HOME  => %JAVA_HOME%\bin

ZOOKEEPER_HOME = > %ZOOKEEPER_HOME%\bin

 Zookeper server'ı run etmek için command prompt'tan olarak zkserver yazmak yeterli.

 

  • Kafka Installation

Kafka için bu adresten kafkanın binary dosyalarını inderelim ve ilgili dosyaları C:/kafka dizinie exract edelim. Powershell yada cmd kullanarak kafka dizinine gidip şu komutu çalıştıralım;

 .\bin\windows\kafka-server-start.bat ./config/server.properties

bu komutla birlikte kafka çalışmaya başlayacaktır.

Bütün kurumlarımızı tamamladık şimdi sırada Producer ve Consumer uygulamalarını oluşturmak var.

Application

Yazının başında da söylediğimiz gibi bir .Net Core uyglaması üzerinde kafka kullanacağız ve örnek proje olarak, email göndermede kullanılan bir producer-consumer uygulaması geliştirelim. İlk olarak aşağıdaki gibi Kafka message'ını tanımlayalım. Bu message sınıfı hem consumer hemde producer tarafından kullanılacağından solution'da Kafka.Message adında bir .Net Core class-library projesi içerisinde tanımlı olsun.

1) Kafka.Message

public class EmailMessage:IMessageBase
{
    public string To { get; set; }
    public string Subject { get; set; }
    public string Content { get; set; }
}

Bu message sınıfına ait verileri producer tarafından kafka'da bulunan emailmessage-topic adındaki topic collection'ına bırakılacak. Yine aynı solution'da Kafka.Producer adında bir console application oluşturalım.

2) Kafka.Producer

Nuget üzerinde kafka client olarak kullanılabilecek belli başlı bazı kütüphaneler bulunmakta. .Net Core uyumluluğu açısından biz örnek projede Confluent.Kafka client'ını kullanacağız. Her ne kadar beta versiyonu olsada github-rating'leri bakımından oldukça beğenilen bir kütüphanedir.

Install-Package Confluent.Kafka -Version 1.0-beta

Producer'da belirtilen topic için kafka ya message push etmede kullanacağımız IMessageProducer interface ve implementasyonunu aşağıdaki gibi tanımlayalım ve kullanım olarakda Program.cs içerisinde Main func'da Produce metodunu call ederek EmailMessage'ını push edelim.

public interface IMessageProducer
{
    void Produce(string topic, IMessageBase message);
}

public class MessageProducer : IMessageProducer
{
    public void Produce(string topic, IMessageBase message)
    {
        var config = new ProducerConfig { BootstrapServers = "localhost:9092" };

        using (var producer = new Producer<Null, string>(config))
        {
            var textMessage = JsonConvert.SerializeObject(message);
           
            producer.BeginProduce(topic, new Message<Null, string> { Value = textMessage }, OnDelivery);

            // wait for up to 10 seconds for any inflight messages to be delivered.
            producer.Flush(TimeSpan.FromSeconds(10));
        }
    }

    private void OnDelivery(DeliveryReportResult<Null, string> r)
    {
        Console.WriteLine(!r.Error.IsError ? $"Delivered message to {r.TopicPartitionOffset}" : $"Delivery Error:{r.Error.Reason}");
    }
}

static void Main(string[] args)
{
    IMessageProducer messageProducer = new MessageProducer();

    //produce email message
    var emailMessage = new EmailMessage
    {
        Content = "Contoso Retail Daily News Email Content",
        Subject = "Contoso Retail Daily News",
        To = "all@contosoretail.com.tr"
    };
    messageProducer.Produce("emailmessage-topic", emailMessage);
    
    Console.ReadLine();
}

Dilerseniz topic oluşturma ve message produce işlemlerini command-prompt üzerinden de yapabilirsiniz, biz örnek projede için kafka client kullanarak topic oluşturduk.

3. Kafka.Consumer

Consumer projeside kafka da emailmessage-topic'ine push edilen message'ları consume edip ilgili business'ları process eden uygulamamız olacaktır. Bunun için solution'da Kafka.Consumer adında bir Console Application oluşturalım ve yine nuget üzerinden Confluent.Kafka kütüphanesini projemiz referanslarına ekleyelim.

Kurulumu tamamladıktan sonra consume işleminde kullanacağımız abstract MessageConsumerBase sınıfını aşağıdaki gibi tanımlayalım.

public abstract class MessageConsumerBase<IMessage>
{
    private readonly string _topic;

    protected MessageConsumerBase(string topic)
    {
        this._topic = topic;
    }

    public void StartConsuming()
    {
        var conf = new ConsumerConfig
        {
            GroupId = "emailmessage-consumer-group",
            BootstrapServers = "localhost:9092",
            AutoOffsetReset = AutoOffsetResetType.Earliest
        };

        using (var consumer = new Consumer<Ignore, string>(conf))
        {
            consumer.Subscribe(_topic);

            var keepConsuming = true;
            consumer.OnError += (_, e) => keepConsuming = !e.IsFatal;

            while (keepConsuming)
            {
                try
                {
                    var consumedTextMessage = consumer.Consume();
                    Console.WriteLine($"Consumed message '{consumedTextMessage.Value}' Topic: {consumedTextMessage.Topic}'.");

                    var message = JsonConvert.DeserializeObject<IMessage>(consumedTextMessage.Value);

                    OnMessageDelivered(message);
                }
                catch (ConsumeException e)
                {
                    OnErrorOccured(e.Error);
                }
            }

            // Ensure the consumer leaves the group cleanly and final offsets are committed.
            consumer.Close();
        }
    }

    public abstract void OnMessageDelivered(IMessage message);

    public abstract void OnErrorOccured(Error error);
}

Bu base sınıfı inherit almış EmailMessageConsumer sınıfı StartConsuming() metodunu call ederek consume etmeye başlamasını sağlayan kod bloğunu Program.cs içerisinde aşağıdaki gibi tanımlayalım.

public class EmailMessageConsumer : MessageConsumerBase<EmailMessage>
{
    public EmailMessageConsumer() : base("emailmessage-topic") { }

    public override void OnMessageDelivered(EmailMessage message)
    {
        Console.WriteLine($"To: {message.To} \nContent: {message.Content} \nSubject: {message.Subject}");

        //todo email send business logic
    }

    public override void OnErrorOccured(Error error)
    {
        Console.WriteLine($"Error: {error}");

        //todo onerror business
    }
}

static void Main(string[] args)
{
    Console.WriteLine("Consumer Started !");

    var emailMessageConsumer = new EmailMessageConsumer();
    emailMessageConsumer.StartConsuming();
    
    Console.ReadLine();
}

Örnek uygulama geliştirmemiz bitti. Önce producer ardında consumer projelerini sırasıyla run edip producer tarafından üretilen mesajın kafka üzerinden consumer tarafından consume edilip data-transfer'in sağlandığını görebilirsiniz.

Kafka günümüz itibariyle rakiplerine gore data-transmission'ı daha hızlı ve performanslı olması açısından özellikle real-time streaming uygulamalar için en iyi çözüm olarak kabul edilmekte. RabbitMQ, MSMQ, IBM MQ ve Kafka gibi messaging yapılarının arasından neden kafka diye sorduğumuzda; kafka özellikle huge-amount-of-data transfer söz konusu olduğunda (örnek olarak: IOT ve Chat yapıları ) sektör tarafından en iyi seçenek olarak kabul edilmekte. Eğer uygulamanız hızlı ve scalable bir message-broker'a ihtiyaç duyarsa kafka müthiş bir seçenek olacaktır.

Source Code

Chain of Responsibility Pattern Nedir

Chain of Responsibility pattern behavioral patterns gurubuna ait olan ve özünde birbirini takip eden iş dizisine ait process'leri redirect ve handle etmek yada istekte bulunan-confirm eden süreçleri için çözüm olarak ortaya çıkmış bir tasarım desendir.

Yukarıda tanım yaparken birbirini takip eden iş dizesinden kasıt birbirlerine Loosly Coupled bir şekilde zincir gibi bağlı olan süreçleri process etmek için kullanabileceğimiz bir pattern dir.

Bir örnek ile ele alacak olursak; veznede çalışan bir kişi için günlük nakit para çekim miktarı 40 bin TL olan bir banka düşünelim ve bu bankaya gelen bir müşteri veznede bulunan kişiden 240 bin TL para çekmek istediğini söyledi. Banka kuralları gereği bu işlemin sırasıyla veznedar, yönetici, müdür ve bölge sorumlusu tarafından sırasıyla onaylaması gerekmekte. Bakacak olduğumuzda zincir şeklinde birbirine bağlı olan bir onay yapısı bulunmakta. 

Akış olarak özetleyecek olursak;

  1.  Müşteri 480 bin TL lik para çekme isteğini veznedar'a iletir.
  2.  Veznedar bu isteği alır ve kontrol eder eğer onaylayabileceği bir tutar ise onaylar, onaylayabileceği bir tutar değilse yöneticisine gönderir,
  3.  Yönetici isteği alır  onaylayabileceği bir tutar değilse müdüre iletir,
  4.  Müdür kontrol eder eğer onaylayabileceği bir tutar değilse bölge sorumlusunun onayına gönderir,
  5.  Bölge sorumlusu onaylar ve para müşteriye verilir.

Yukarıda bahsettiğimiz bu örneğimizi Chain of Responsibility pattern uygulayarak geliştirelim.

İlk olarak Withdraw adında domain model tanımlayalım.

   public class Withdraw
    {
        public string CustomerId { get; }
        public decimal Amount { get; }
        public string CurrencyType { get; }
        public string SoruceAccountId { get; }

        public Withdraw(string customerId, decimal amount, string currencyType, string soruceAccountId)
        {
            CustomerId = customerId;
            Amount = amount;
            CurrencyType = currencyType;
            SoruceAccountId = soruceAccountId;
        }
    }

Sornasında abstract bir Employee sınıfı tanımlayalım ve içerisinde aşağıdaki gibi property'lerinı yazalım.

    public abstract class Employee
    {
        protected Employee NextApprover;

        public void SetNextApprover(Employee supervisor)
        {
            this.NextApprover = supervisor;
        }

        public abstract void ProcessRequest(Withdraw req);
    }

Yukarıda bulunan NextApprover isimli property o sınıfa ait kişinin yöneticisi olarak atanan kişidir ve ProcessRequest metodunda ilgili condition'ı yazıp sırasıyla NextApprover'ları call edeceğiz.

Veznedar, Yonetici, Mudur ve BolgeSorumlusu sınıfları yukarıda tanımladığımız Employee sınıfından inherit olacak şekilde aşağıdaki gibi oluşturalım. 

    public class Veznedar : Employee
    {
        public override void ProcessRequest(Withdraw req)
        {
            if (req.Amount <= 40000)
            {
                Console.WriteLine("{0} tarafından para çekme işlemi onaylandı #{1}",
                    this.GetType().Name, req.Amount);
            }
            else if (NextApprover != null)
            {
                Console.WriteLine("{0} TL işlem tutarı {1} max. limitini aştığından işlem yöneticiye gönderildi.", req.Amount, this.GetType().Name);

                NextApprover.ProcessRequest(req);
            }
        }
    }

    public class Yonetici : Employee
    {
        public override void ProcessRequest(Withdraw req)
        {
            if (req.Amount <= 70000)
            {
                Console.WriteLine("{0} tarafından para çekme işlemi onaylandı #{1} TL",
                    this.GetType().Name, req.Amount);
            }
            else if (NextApprover != null)
            {
                Console.WriteLine("{0} TL işlem tutarı {1} max. limitini aştığından işlem yöneticiye gönderildi.", req.Amount, this.GetType().Name);

                NextApprover.ProcessRequest(req);
            }
        }
    }

    public class Mudur : Employee
    {
        public override void ProcessRequest(Withdraw req)
        {
            if (req.Amount <= 150000)
            {
                Console.WriteLine("{0} tarafından para çekme işlemi onaylandı #{1} TL",
                    this.GetType().Name, req.Amount);
            }
            else if (NextApprover != null)
            {
                Console.WriteLine("{0} TL işlem tutarı {1} max. limitini aştığından işlem yöneticiye gönderildi.", req.Amount, this.GetType().Name);

                NextApprover.ProcessRequest(req);
            }
        }
    }

    public class BolgeSorumlusu : Employee
    {
        public override void ProcessRequest(Withdraw req)
        {
            if (req.Amount <= 750000)
            {
                Console.WriteLine("{0} tarafından para çekme işlemi onaylandı #{1} TL",
                    this.GetType().Name, req.Amount);
            }
            else
            {
                throw new Exception(
                    $"Limit banka günlük işlem limitini aştığından para çekme işlemi #{req.Amount} TL onaylanmadı.");
            }
        }
    }

Son olarak ise domain modeli initialize edip chain'i oluşturup process metodunu call edelim.

    static void Main(string[] args)
    {
        var withdraw = new Withdraw("a6e193dc-cdbb-4f09-af1a-dea307a9ed15", 480000, "TRY", "TR681223154132432141412");
 
        Employee veznedar = new Veznedar();
        Employee yonetici = new Yonetici();
        Employee mudur = new Mudur();
        Employee bolgeSorumlusu = new BolgeSorumlusu();
 
        veznedar.SetNextApprover(yonetici);
        yonetici.SetNextApprover(mudur);
        mudur.SetNextApprover(bolgeSorumlusu);
 
 
        withdraw.Process(veznedar);
 
        Console.ReadKey();
    }

Yukarıdaki gibi 480000 TL lik bir işlem için istekte bulunduğumuzda console çıktısı aşağıdaki gibi olacaktır.

İşlem sırasıyla veznedar, yönetici, müdür bölge sorumlusunun önünde düşecektir. Son olarak ise bölge sorumlusunun onaylayabileceği bir tutar olduğundan onay verip banka müşterisine ödeme işlemini yapacaktır.

Chain of Responsibility pattern bir chain halinde process edilmesi gereken operasyonlar için rahatlıkla kullanabileceğimiz bir pattern dir. Yazılım dünyasında kullanım olarak diğer desing pattern'lar arasında %30-%40 lık bir orana sahip olduğu iddia edilir ve sıkça kullanılmaktadır.

NULL Object Pattern Nedir ?

NULL Object Pattern Gang of Four’s Design Patterns kitabında anlatılmış olup behavioral design pattern'ler den biridir. Bu pattern'in amacı uygulama içeresinde null objeler return etmek yerine ilgili tipin yerine geçen ve expected value'nun null objesi olarak kabul edilen tipi geriye dönmektir diğer bir değişle null yerine daha tutarlı nesneler dönmektir. Bu nesne asıl return edilmesi gereken nesnenin null değeri olarak kabul edilirken onunla aynı özelliklere sahip değildir, çok daha az bilgi içermektedir. NULL Object Pattern , süreli olarak null kontrolü yaparak hem server-side hemde client-side için boilerplate code yazmaya engel olmak amacıyla ortaya çıkmış bir pattern dir.

Platform yada dil farketmeksizin geliştirme yaparken sürekli olarak nullreferenceexception aldığımız durumlar olmuştur bu durumdan kurtulmak adına obj null mı değil mi diye bir sürü if/else kontrolleri yaparız. Bu pattern'i kullanarak biraz sonraki örnekte yapacağımız gibi boilerplate code'lar yazmaktan nasıl kurtulabiliriz bunu inceleyeceğiz.

Örneğimizi 2 şekilde ele alalım. İlk olarak geriye null değer return ederek çoğunlukla nasıl geliştirme yapıyoruz o case'i ele alalım, sonrasında ise NULL Object Pattern kullanarak nasıl geliştirebiliriz onu inceleyelim.

Öncelikle Customer adında bir nesnemiz var ve repository kullanarak geriye bu nesneyi return edelim. 

    public class Customer
    {
        public int Id { get; set; }
        public string FirstName { get; set; }
        public string LastNae { get; set; }
        public int NumberOfChildren { get; set; }
        public string GetFullName()
        {
            return FirstName + " " + LastName;
        }
    }

Service katmanında generic bir repository yapımız varmış gibi varsayalım ve repository üzerinden GetCustomerByFirstName adında bir metot tanımlayalım.

public class CustomerService
    {
        public Customer GetCustomerByFirstName(string firstName)
        {
            return _customerRepository.List(c => c.FirstName == firstName).FirstOrDefault();
        }
    }

Sonrasında yukarıda tanımladığımız metodu call yaparak geriye customer objesini dönelim ve bazı değerleri ekrana yazdıralım.

   var customerService = new CustomerService();
   var customer = customerService.GetCustomerByFirstName("tosuner");
   Console.WriteLine("FullName : " + customer.GetFullName() + "\nNumber Of Childreen:" + customer.NumberOfChildren);

Yukarıdaki gibi customer'ın null geldiği durumda exception thrown 'system.nullreferenceexception' hatasını çoktan aldık gibi yani memory'de değeri assing edilmemiş bir yere erişmeye çalışıyoruz. Peki çözüm olarak ne yapabilirdik, ilk akla gelen aşağıdaki gibi bir kontrol olacaktır.

    var customerService = new CustomerService();
    var customer = customerService.GetCustomerByFirstName("tosuner");
    if (customer != null)
    {
        Console.WriteLine("FullName : " + customer.GetFullName() + "\nNumber Of Childreen:" + customer.NumberOfChildren);
    }
    else
    {
        Console.WriteLine("Name : Customer Not Found !" + "\nNumber Of Childreen: 0");
    }

Yukarıdaki gibi bir çözüme gittiğimizde customer objesini get ettiğimiz bir sürü yer olduğunu düşünün ve her yerde sürekli olarak null kontrolü yapıp sonrasında console'a değerleri yazıyor oluruz. Aslında bu şu deme değil;"null kontrolü yapma arkadaş !" kesinlikle bu değil tabikide ihtiyaç duyulan yerlerde bu kontrol yapılmalı hatta birçok case'de null ise throw new CustomBusinessException() vs şeklinde exception'da throw edeceğimiz durumlar olabilir. Demek istediğim yukarıdaki gibi client'a bu kontrolü olabildiğince bırakmamak.

NULL Object Pattern uygulayarak nasıl bir çözüm getirirdik ona bakalım. İlk olarak AbstractCustomer adında base sınıfımızı oluşturalım.

    public abstract class AbstractCustomer
    {
        public abstract int Id { get; set; }
        public abstract string FirstName { get; set; }
        public abstract string LastName { get; set; }
        public abstract int NumberOfChildren { get; set; }
        public abstract string GetFullName();
    }

Sonrasında Customer objesini bu sınıftan türetelim.

    public class Customer : AbstractCustomer
    {
        public override string FirstName { get; set; }
        public override string LastName { get; set; }
        public override int NumberOfChildren { get; set; }
        public override int Id { get; set; }

        public override string GetFullName()
        {
            return FirstName + " " + LastName;
        }
    }

Şimdi ise bu pattern'in getirdiği çözüm olarak geriye null value dönmeyip asıl return edilmek istenen sınıf yerine onun null olduğunu belirten bir sınıf geriye dönelim. Bu sınıfa da NullCustomer adını verelim.

    public class NullCustomer : AbstractCustomer
    {
        public override string FirstName { get; set; }
        public override string LastName { get; set; }
        public override int NumberOfChildren { get; set; }
        public override int Id { get; set; }

        public override string GetFullName()
        {
            return "Customer Not Found !";
        }
    }

Sonrasında service katmanını aşağıdaki gibi düzenleyelim.

    public class CustomerService
    {
        public AbstractCustomer GetCustomerByFirstName(string firstName)
        {
            return _customerRepository.Where(c => c.FirstName == firstName).FirstOrDefault().GetValue();
        }
    }
    public static class CustomerExtensions
    {
        public static AbstractCustomer GetValue(this AbstractCustomer customer)
        {
            return customer == null ? new NullCustomer() : customer;
        }
    }

Yukarıdaki kod bloğunda görüldüğü üzre repository null değer dönmek yerine yeni bir NullCustomer sınıfı return edecektir.

Son adım olarak da cient tarafında yazılacak kod ise yazımızın ilk başında yazdığımız kod bloğu ile aynı.

   var customerService = new CustomerService();
   var customer = customerService.GetCustomerByFirstName("tosuner");
   Console.WriteLine("FullName : " + customer.GetFullName() + "\nNumber Of Childreen:" + customer.NumberOfChildren);

Bu pattern ile;

  • null reference kontrollerinden kurtulduk,
  • duplicate kod oranını azalttık,
  • memory de değeri olmayan bir alana erişmek yerine null value görevi gören bir nesneye eriştik,
  • dahası client tarafı için daha temiz ve kolay anlaşılır bir kod bıraktık,

Daha öncede belirtiğim gibi bu pattern'i her zaman uygulama gibi bir durum söz konusu değil, daha doğrusu sürekli null check yapmak yerine bu pattern'i uygulayalım gibi bir düşünce doğru değil. Client-side geliştirme yapan developer'a bu kontrolleri yaptırmak istemediğimizde yada "ben server-side'dan hiçbir zaman null dönmicem.." şeklinde bir garanti vermek istediğinizde kullanabileceğimiz bir pattern dir.

RabbitMQ Nedir ? Windows Üzerinde Kurulumu

Messaging Queue (MQ), fire-and-forget communication dediğimiz asynchronous çalışma yapısı üzerine kurulmuş yapılar için günümüz yazılım dünyasının en popüler yapısıdır. Bu yapılara örnek olarak; JMS, MSMQ, RabbitMQ, Kafka etc. verebiliriz ve  genel çerçeveden baktığımızda messaging queue'ler bir sender-receiver şeklinde çalışırlar.

RabbitMQ Nedir

RabbitMQ, Erlang dili ile open-source olarak geliştirilen ve Open Telecom Platform kütüphanesi üzerinde build edilebilen günümüz server-to-server/app-to-app communication ihtiyaçları konusunda giderek popüler olan hızlı bir messaging queue yapısıdır. Advanced Message Queuing Protocol (AMQP) implement ederek uygulamalar ve server'lar arası veri alışverişini sağlar.

Rabbitmq Publisher ve Consumer mantığıyla çalışır. Örneğin data-exchange yapmanız gereken bir iş var bunu rabbitmq üzerinden publisher'ile ilgili queue'ya publish edip sonrasında bu queue'yu consume edecek bir consumer projesi oluşturup yapmak istediğimiz bu işlemi consumer'a yaptırabiliriz. Bu işlemler genelde ana uygulama üzerinde yapmak istemeyeceğimiz yükü fazla olan işlemler olabilir.

Daha basit anlatacak olursak; sunucu RabbitMQ sunucusuna bir message gönderir ve sunucu bu mesajı ilgili queue'ya yönlendirir. Sonrasında başka bir uygulama bu queue'yu dinler ve FIFO mantığıyla çalışan kuyruktaki bu mesajları consume ederek süreci sonlandırır. Sahip olduğu Web Management Interface ile de bulunan queue'ları görüntüleyebilme, requeue etme, delete, purge gibi daha bir çok işlemi yapabilmemizi sağlar.

Queue yapıları aslında projelerimiz için birer middleware görevi görmektedir. Ana uygulamanızdan queue ya push ettiğiniz message Consumer down olsa dahi o message'ı queue'da bekletir ve consumer tekrardan start olduğunda o message'ı tekrar tekrar push etmeye çalışır ve böylelikle veri kaybının da önüne geçmemizi sağlar. 

Bazı term'lere bakacak olursak;

Producer/Publisher : Queue'ya message'ı gönderen yapıya verilen isimdir.
Consumer : Queue'yu dinleyerek ilgili message'ları receive eden yapıdır.                                                                                                                                               Queue : First-in-first-out mantığıyla çalışan kuyruk yapımız.
Exchange : Routing yani kuyruğa iletilen message'ı route eden yapıdır ve routing işlemini yapan çeşitli yapılar bulunmaktadır.

 

Kurulum

İlk olarak rabbitmq erlang dili ile geliştirildiğinden makinamızda sahip olduğumuz işletim sistemine göre uygun versiyona ait erlang dosyalarını erlang.org sitesinden indirip kuralım.

 

Sonrasında ise rabbitmq.com sitesinden Windows için güncel rabbitmq server versiyonunu indirip kuralım.

Kurulumlar sorunsuz bir şekilde tamamlandıktan sonra yukarıda da bahsettiğimiz web arayüzünü aktifleştirelim. Bunun için rabbimq sbin dosyası içerisinde olan RabbitMQ Command Prompt'ı çalıştıralım ve aşağıdaki komut satırını yazalım.

> rabbitmq-plugins enable rabbitmq_management

RabbitMQ windows makinada Windows Service olarak çalışır ve yukarıda yazdığımız web arayüzünü enable etme komutunun hemen çalışabilmesi için rabbitmq'yu aşağıdaki gibi stop/start edelim.

> net service stop RabbitMQ
.....
> net service start RabbitMQ

Şimdi ise browser üzerinden http://localhost:15672/ adresine giderek login için default credential'lar username/password guest/guest olarak girdikten sonra aşağıdaki gibi arayüzü göreceğiz.

Şuan mevcutta herhangi bir queue oluşturmadığımızdan üstte bulunan exchange tab'ına tıkladığımızda aşağıdaki gibi default exchange listesini görüntüleyebiliriz.

RabbitMQ Windows Üzerinden kurulumu bu kadardı.

Sonraki yazılarda Exchange tool'larındanbirini kullanarak basit bir publisher consumer örneği ile devam edeceğiz.                               

                       

 

OptimisticLock using Fluent NHibernate

OptimisticLock ve PessimisticLock konuları hakkında Optimistic Lock Nedir ? Pessimistic Lock Nedir ? Data concurrency yazımızda bahsetmiştik. Kısaca hatırlatmak gerekirse;farklı thread'ler de aynı row üzerinde işlem yapılırken herhangi bir lock işlemi olmadan update edilmek istenen verinin bayat olup olmadığını o verinin kayıtlı olduğu tabloda yer alan versiyon numarası olarak da adlandırılan bir column'da bulunan değeri kontrol eder ve eğer versiyon eşleşmiyorsa yani veri bayat ise işlem geri çekilir.

Bu yazıda ise Nhibernate kullanarak optimistic lock nasıl yapılır bunu inceleyeceğiz. Daha önceki Unit of Work Interceptor, Castle Windsor, NHibernate ve Generic Repository yazısında geliştirdiğimiz proje üzerinden ilerleyelim. Bir web api projesi oluşturmuştuk ve nuget üzerinden Fluent Nhibernate'i yüklemiştik. İçerisinde User ve Address adında iki tane tablomuz bulunuyordu. Nhibernate için optimistic lock konfigurasyonu mapping işlemi yapılırken belirtiliyor. Bizde öncelikle versiyonlamak veya optimistic lock uygulamak istediğimiz entity'ler için bir base model oluşturalım.

    public abstract class VersionedEntity
    {
        public virtual int EntityVersion { get; set; }
    }

User modelimiz ise yukarıda tanımladığımız modelden inherit olsun ve aşağıdaki gibi UserMapping.cs içerisinde konfigurasyonlarımızı yapalım.

 

    public class User : VersionedEntity
    {
        public virtual int Id { get; set; }
        public virtual string Name { get; set; }
        public virtual string SurName { get; set; }
    }

    public class UserMap : ClassMap<User>
    {
        public UserMap()
        {
            Table("Users");
            Id(x => x.Id);
            Map(x => x.Name);
            Map(x => x.SurName);

            // versiyon işlemi için kullanılacak column
            Version(X => X.EntityVersion);
            
            // optimistic lock'ı versiyonlama üzerinden aktif hale getiriyoruz
            OptimisticLock.Version();
        }
    }

Database de Users tablomuzda EntityVersion adında bir column yaratılacak ve bu column o row için yapılan her bir update işleminde 1 artacaktır.

Konfigurasyon işlemi bu kadar şimdi test yapalım. Aşağıdaki gibi AddnewUser metoduna postman üzerinden sırayla 1 insert 2 get 2 put(update) request'i atalım.

İlk insert işlemi sonrasında db deki kayıt aşağıdaki gibi EntityVersion= 1 şeklinde olacaktır.

Sonrasında ardı ardına 2 get işlemi yapıp db deki kaydı alalım ve sonrasındaki ilk update işlemi sonrasında kaydımız aşağıdaki gibi EntityVersion = 2 şeklinde güncellenecektir.

İkinci get işlemini yapan transaction için yani üstte update yapılmışken eline stale/bayat veriye sahipken update işlemi yapmaya çalıştığında diğer bir değişle db de ki EntityVersion = 2 iken ikinci işlemin elinde EntityVersion = 1 olan kayıt varken update yapmaya çalıştığında aşağıdaki gibi bir exception throw edilir.

Hata mesajı bize o row'un bize başkabir transaction tarafından update veya delete edildiğini belirtmekte. Bu durumu yaşamamak için ikinci işlem için tekrardan db de bulunan kayıt get edilip üzerinden bir update işlemi yapıldığında db deki son görüntüsü aşağıdaki gibi EntityVersion = 3 şeklinde olacaktır.

 

Optimistic Lock için yazımız buraya kadar. Yukarıda da belirttiğim gibi örnek kodlar Unit of Work Interceptor, Castle Windsor, NHibernate ve Generic Repository yazısında bulunmakta. Eksik kalan yerler için ordan devam edebilirsiniz.

Unit of Work Interceptor, Castle Windsor, NHibernate ve Generic Repository

Unit of Work Pattern Martin Fowler'ın 2002 yılında yazdığı Patterns of Enterprise Application Architecture kısaca PoEAA olarak da adlandırılan kitabında bahsetmesiyle hayatımıza girmiş bir pattern dır.

M.Fowler kitabında UoW'ü şu şekilde tanımlar,

Maintains a list of objects affected by a business transaction and coordinates the writing out of changes and the resolution of concurrency problems.

Unit of Work; database'de execute etmemiz gereken bir dizi işlemin yani birden fazla transaction'a ihtiyaç duyarak yapacağımız işlemler (Create, Update,  Insert, Delete, Read) dizinini success veya fail olması durumunda tek bir unit yani tek bir birim olarak ele alıp yönetilmesini sağlayan pattern dir.

Diğer bir değişle; ardı ardına çalışması gereken 2 sql transaction var ve bunlardan biri insert diğeride update yapsın. İlk olarak insert yaptınız ve hemen sonrasında update sorgusunu çalıştırdınız fakat update işlemi bir sorun oluştu ve fail oldu. Unit of work tam da bu sırada araya girerek bu iki işlemi bir birimlik bir işlem olarak ele alır ve normal şartlarda ikisininde success olması durumunda commit edeceği sessino'ı update işlemi fail verdiğinden ilk işlem olan insert'ü rollback yapar ve db de yanlış veya eksik kayıt oluşmasını engeller. Yada ikiside success olduğunda session'ı commit ederek consistency'i sağlar.

Örnek üzerinden ilerleyecek olursak; bir data-access katmanımız olsun ve ORM olarak da NHibernate'i kullanıyor olalım. Projemizde IoC container olarak da Castle Windsor'ı entegre edelim. İlk olarak Vs'da "UoW_Sample" adında bir Empty Asp.Net Web Api projesi oluşturalım ve sonrasında nugetten Sırasıyla Fluent-NHibernate ve Castle Windsor'ı yükleyelim.

Case'imiz şu şekilde olsun; User ve Address adında tablolarımız var ve AddNewUser adında bir endpoint'ten hem kullanıcı hemde address bilgileri içeren bir model alarak sırasıyla User'ı ve Address'i insert etmeye çalışalım. User'ı insert ettikten sonra Address insert sırasında bir sorun oluşsun ve UoW araya girerek kaydedilecek olan user'ı da rollback yapsın.

Öncelikle User ve Address modellerimizi aşağıdaki gibi oluşturalım.

public class User
   {
       public virtual int Id { get; set; }
       public virtual string Name { get; set; }
       public virtual string SurName { get; set; }
   }
public class Address
   {
       public virtual int Id { get; set; }
       public virtual string CityCode { get; set; }
       public virtual string DistrictCode { get; set; }
       public virtual string Description { get; set; }
       public virtual int UserId { get; set; }
   }

Bu modellere ait Nhibernate Mapping'lerini de aşağıdaki gibi oluşturalım.

public class UserMap : ClassMap<User>
{
    public UserMap()
    {
        Id(x => x.Id);
        Map(x => x.Name);
        Map(x => x.SurName);
        Table("Users");
    }
}
public class AddressMap : ClassMap<Address>
{
    public AddressMap()
    {
        Id(x => x.Id);
        Map(x => x.CityCode);
        Map(x => x.DistrictCode);
        Map(x => x.Description);
        Map(x => x.UserId);
        Table("Address");
    }
}

Repository kullanımı için aşağıdaki gibi generic repo class'larını oluşturalım. Bu arayüz üzerinden db de bulunan tablolarımız için CRUD işlemlerini yapacağız.

    public interface IRepository<T> where T : class
    {
        T Get(int id);
        IQueryable<T> SelectAll();
        T GetBy(Expression<Func<T, bool>> expression);
        IQueryable<T> SelectBy(Expression<Func<T, bool>> expression);
        int Insert(T entity);
        void Update(T entity);
    }
  public abstract class BaseRepository<T> : IRepository<T> where T : class
    {
        public ISessionFactory SessionFactory { get; private set; }

        public ISession _session
        {
            get { return this.SessionFactory.GetCurrentSession(); }
        }

        public BaseRepository(ISessionFactory sessionFactory)
        {
            SessionFactory = sessionFactory;
        }

        public T Get(int id)
        {
            return _session.Get<T>(id);
        }

        public IQueryable<T> SelectAll()
        {
            return _session.Query<T>();
        }

        public T GetBy(Expression<Func<T, bool>> expression)
        {
            return SelectAll().Where(expression).SingleOrDefault();
        }
        public IQueryable<T> SelectBy(Expression<Func<T, bool>> expression)
        {
            return SelectAll().Where(expression).AsQueryable();
        }

        public int Insert(T entity)
        {
            var savedId = (int)_session.Save(entity);
            _session.Flush();
            return savedId;
        }

        public void Update(T entity)
        {
            _session.Update(entity);
            _session.Flush();
        }
    }

Tablolarımıza karşılık UserRepository ve AddressRepository class'larını arayüzleri ile birlikte aşağıdaki gibi tanımlayalım.

    public interface IUserRepository : IRepository<User>
    { }

    public class UserRepository : BaseRepository<User>, IUserRepository
    {
        public UserRepository(ISessionFactory sessionFactory) : base(sessionFactory)
        {
        }
    }
    public interface IAddressRepository : IRepository<Address>
    { }

    public class AddressRepository : BaseRepository<Address>, IAddressRepository
    {
        public AddressRepository(ISessionFactory sessionFactory) : base(sessionFactory)
        {
        }
    }

Repository'lerimiz direkt olarak api'ın controller'ları ile haberleşmesini istemediğimizden bir service katmanımızın olduğunu düşünerek UserService adında doğrudan Repository'ler ile iletişim kurabilen class'ımızı oluşturalım ve Unit Of Work interceptor'ı da bu service class'ları seviyesinde container'a inject edeceğiz.

Projede yer alan service'leri bir çeşit flag'lemek adına IApiService adında bir base interface tanımlayalım.Bu interface'i daha sonrasında container'a bütün service'leri register etmede de kullanacağız.

    public interface IApiService
    {   }

    public interface IUserService : IApiService
    {
        void AddNewUser(AddNewUserRequest reqModel);
    }
    public class UserService : IUserService
    {
        private readonly IUserRepository _userRepository;
        private readonly IAddressRepository _addressRepository;

        public UserService(IUserRepository userRepository, IAddressRepository addressRepository)
        {
            _userRepository = userRepository;
            _addressRepository = addressRepository;
        }

        public void AddNewUser(AddNewUserRequest reqModel)
        {
            var user = new User { Name = reqModel.User.Name, SurName = reqModel.User.SurName };
            var userId = _userRepository.Insert(user);

            var address = new Address { UserId = userId, CityCode = reqModel.Address.CityCode, Description = reqModel.Address.Description, DistrictCode = reqModel.Address.DistrictCode };
            _addressRepository.Insert(address);
        }
    }

    public class AddNewUserRequest
    {
        public UserDto User { get; set; }
        public AddressDto Address { get; set; }
    }
    public class UserDto
    {
        public string Name { get; set; }
        public string SurName { get; set; }
    }
    public class AddressDto
    {
        public string CityCode { get; set; }
        public string DistrictCode { get; set; }
        public string Description { get; set; }
    }

Yukarıda end-point'imizin alacağı request model ve onun dto class'larını da oluşturduk. Şimdi ise api end-point'imizi tanılayalım.  UserController adında client'ların call yapacağı controller'ımız aşağıdaki gibi olacaktır.

    public class UserController : ApiController
    {
        private readonly IUserService _userService;

        public UserController(IUserService userService)
        {
            _userService = userService;
        }

        [HttpPost]
        public virtual HttpResponseMessage AddNewUser(AddNewUserRequest reqModel)
        {
            _userService.AddNewUser(reqModel);
            return Request.CreateResponse();
        }
    }

Geliştirmemiz gereken 2 yer kaldı Castle Windsor implementasyonu ve UnitOfWork Interceptor oluşturulması. Projemizde her şeyi interface'ler üzerinden yaptık ve constructor injection'dan faydalandık. Şimdi ise Repository, Service ve Controller'lar için bağımlılıkları enjekte edelim ve UnitOfWork Interceptor'ı oluşturalım. 

İlk olarak NHibernateInstaller.cs'i tanımlayalım. Burda web.config/app.config dosyamızda "ConnString" key'i ile kayıtlı database conenction string'imiz olduğunu varsayalım ve aşağıdaki gibi tanımlamalarımızı yapalım.

    public class NHibernateInstaller : IWindsorInstaller
    {
        public void Install(IWindsorContainer container, IConfigurationStore store)
        {
            var sessionFactory = Fluently.Configure()
               .Database(MsSqlConfiguration.MsSql2012.ConnectionString(c => c.FromConnectionStringWithKey("ConnString")).ShowSql())
               .Mappings(m => m.FluentMappings.AddFromAssemblyOf<UserMap>())
               .ExposeConfiguration(cfg => new SchemaUpdate(cfg).Execute(false, true))
                        .ExposeConfiguration(cfg =>
                        {
                            cfg.CurrentSessionContext<CallSessionContext>();
                        })
               .BuildSessionFactory();

            container.Register(
                Component.For<ISessionFactory>().UsingFactoryMethod(sessionFactory).LifestyleSingleton());
        }
    }

İkinci olarak RepositoryInstaller.cs'i oluşturalım. Bu installer ile projemizde bulunan bütün repository interfacelerini ve onların implementasyonlarını container'a register etmiş olucaz. Her bir repository'i ayrı ayrı register etmek yerine bütün repository'lerimiz IRepository interface'in den türediğinden container'a IRepository'i implement eden bütün class'ları register etmesini belirteceğiz.

    public class RepositoryInstaller : IWindsorInstaller
    {
        public void Install(IWindsorContainer container, IConfigurationStore store)
        {
            container.Register(
                Classes.FromThisAssembly()
                    .Pick()
                    .WithServiceAllInterfaces()
                    .LifestylePerWebRequest()
                    .Configure(x => x.Named(x.Implementation.Name))
                          .ConfigureIf(x => typeof(IRepository<>).IsAssignableFrom(x.Implementation), null));
        }
    }

Üçüncü olarak ServiceInstaller.cs class'ını tanımlayalım ancak öncesinde yukarıda da belirttiğimiz gibi UnitOfWork'ü service seviyesinde container'a register edeceğiz. Sebebi ise repository'e erişimimiz service class'ları üzerinden olması. UnitOfWork'ü de interceptor olarak yaratacağız ve böylelikle service metoduna girerken session'ı bind edip metot içerisinde herhangi bir exception aldığında rollback yapacağız yada herhangi bir sorun yoksada session'ı commit edip query'leri execute etmesini sağlayacağız. Aşağıda ilk olarak unitofwork manager ve interceptor class'larını oluşturalım.

    public interface IUnitOfWorkManager
    {
        void BeginTransaction();
        
        void Commit();
        
        void Rollback();
    }
    public class UnitOfWorkManager : IUnitOfWorkManager
    {
        public static UnitOfWorkManager Current
        {
            get { return _current; }
            set { _current = value; }
        }
        [ThreadStatic]
        private static UnitOfWorkManager _current;
        
        public ISession Session { get; private set; }
        
        private readonly ISessionFactory _sessionFactory;
        
        private ITransaction _transaction;
        
        public UnitOfWorkManager(ISessionFactory sessionFactory)
        {
            _sessionFactory = sessionFactory;
        }
        
        public void BeginTransaction()
        {
            Session = _sessionFactory.OpenSession();
            CurrentSessionContext.Bind(Session);
            _transaction = Session.BeginTransaction();
        }

        public void Commit()
        {
            try
            {
                _transaction.Commit();
            }
            finally
            {
                Session.Close();
            }
        }

        public void Rollback()
        {
            try
            {
                _transaction.Rollback();
            }
            finally
            {
                Session.Close();
            }
        }
    }

 Yukarıda oluşturduğumuz manager'ı kullanarak UnitOfWorkInterceptor'ı da aşağıdaki gibi tanımlayalım.

    public class UnitOfWorkInterceptor : Castle.DynamicProxy.IInterceptor
    {
        private readonly ISessionFactory _sessionFactory;

        public UnitOfWorkInterceptor(ISessionFactory sessionFactory)
        {
            _sessionFactory = sessionFactory;
        }

        public void Intercept(IInvocation invocation)
        {
            try
            {
                UnitOfWorkManager.Current = new UnitOfWorkManager(_sessionFactory);
                UnitOfWorkManager.Current.BeginTransaction();

                try
                {
                    invocation.Proceed();
                    UnitOfWorkManager.Current.Commit();
                }
                catch
                {
                    UnitOfWorkManager.Current.Rollback();
                    throw new Exception("Db operation failed.");
                }
            }
            finally
            {
                UnitOfWorkManager.Current = null;
            }
        }
    }

Yukarıda tanımladığımız interceptor'ı aşağıdaki gibi service'leri register ederken bu service class'larına ait metotlar için UnitOfWorkInterceptor'ı configure etmesini belirteceğiz.

    public class ServiceInstaller : IWindsorInstaller
    {
        public void Install(IWindsorContainer container, IConfigurationStore store)
        {
            container.AddFacility<TypedFactoryFacility>();

            container.Register(
                Classes.FromAssemblyContaining<UserService>()
                    .Pick()
                    .WithServiceAllInterfaces()
                    .LifestylePerWebRequest()
                    .Configure(x => x.Named(x.Implementation.Name))
                          .ConfigureIf(x => typeof(IApiService).IsAssignableFrom(x.Implementation),
                            y => y.Interceptors<UnitOfWorkInterceptor>()));

        }
    }

Projemiz bir Web Api projesi olduğundan controller'lar ile ilgili container registration işlemleri için gerekli olan WebApiControllerInstaller.cs class'ı ve ControllerActivator.cs class'ı tanımlamaları da aşağıdaki gibidir.

    public class ApiControllerActivator : IHttpControllerActivator
    {
        private readonly IWindsorContainer _container;

        public ApiControllerActivator(IWindsorContainer container)
        {
            _container = container;
        }

        public IHttpController Create(
            HttpRequestMessage request,
            HttpControllerDescriptor controllerDescriptor,
            Type controllerType)
        {
            var controller =
                (IHttpController)this._container.Resolve(controllerType);

            request.RegisterForDispose(
                new Release(
                    () => this._container.Release(controller)));

            return controller;
        }

        private class Release : IDisposable
        {
            private readonly Action _release;

            public Release(Action release)
            {
                _release = release;
            }

            public void Dispose()
            {
                _release();
            }
        }
    }
    public class WebApiControllerInstaller : IWindsorInstaller
    {
        public void Install(IWindsorContainer container, IConfigurationStore store)
        {
            container.Register(Classes.FromThisAssembly()
                .BasedOn<ApiController>()
                .LifestylePerWebRequest());
        }
    }

Geldik son adıma. Yukarıda tanımladığımız bütün installer class'larını container'a install etmeye. Bunun için projede yer alan Global.asax.cs içerinde yer alan Application_Start metodu içerisine aşağıdaki gibi installation işlemlerini yapalım.

        protected void Application_Start()
        {
            var container = new WindsorContainer();
            container.Register(Component.For<UnitOfWorkInterceptor>().LifestyleSingleton());
            container.Install(new ServiceInstaller());
            container.Install(new RepositoryInstaller());
            container.Install(new NHibernateInstaller());
            container.Install(new WebApiControllerInstaller());
            GlobalConfiguration.Configuration.Services.Replace(
                typeof(IHttpControllerActivator),
                new ApiControllerActivator(container));
            GlobalConfiguration.Configure(WebApiConfig.Register);
        }

Postman üzerinden aşağıdaki gibi end-point'imize call yapalım ve hem iki insert işlemininde başarılı olduğu case'i hemde user insert başarılı olduktan sonra address insert sırasında bir hata verdirip ilk işleminde rollback olduğu case'i oluşturup gözlemleyebiliriz.

Unit of Work pattern gözlemlediğim kadarıyla genellikle projede her query execution sırasında o satırları try-catch e alarak değişik logic'ler uygulanarak yapılıyor ancak. Aspect oriented'ın bize sağladıklarından faydalanarak bir interceptor ile projede her yerde kullanabileceğimiz basit bir infrastructure geliştirebiliriz. Bu pattern ile aynı işleve hizmet eden birden fazla küçük küçük db transaction'ını tek bir unit olarak yönetip dirty data'nın da önüne geçmiş oluyoruz.

Nhibernate IPreInsertEventListener ve IPreUpdateEventListener Kullanımı

Server-side bir projede geliştirme yapıyorsanız ve db de bolca CRUD işlemleri için query'ler çalıştırmanız gerekiyorsa sizden db de kaydedilen o row için sizden insert veya update anında bazı bilgileri otomatik bir şekilde o row için kaydetmeniz istenebilir. Örnek olarak; CreatedDate veya update edilen değer için ModifiedDate gibi alanlar tutmanız muhakkak istenir istenmese dahi bu bilgileri ilgili colum'lar da tutmak muhakkak bir gün işinize yarayacaktır.

Eğer CRUD işlemlerini Ado.Net kullanarak yapıyorsanız query'nin sonuna bu değerleri ekleyebilir yada stored-procedure kullanıyorsanız da bu işlemleri sp içerisinde de yapabiliriz.

Bu yazımızda bu ve benzeri işlemleri Fluent-Nhibernate kullanarak nasıl yapabiliriz konusuna değineceğiz. 

Her defasında yeni kayıt geldi modeli initialize ederken CreatedDate alanına DateTime.Now set et, yada her update işlemi geldiğinde ModifiedDate alanına DateTime.Now alanını set et. Pek de hoş durmuyor sanki. 50'ye yakın db de tablonuz olduğunu düşünün her bir entity için gidip bu işlemleri heralde yapmak istemeyiz .

Eğer proejenizde NHibernate'i kullanıyorsanız Nhibernate bu işlemler için bizlere aşağıdaki interface'leri sunmakta.

  • IPreInsertEventListener
  • IPreUpdateEventListener

IPreInsertEventListener; adında da anlaşılacağı üzre entity'niz insert edilirken bir interceptor gibi araya girmemizi sağlayan ve insert query execution'dan OnPreInsert adındaki metoduna invoke edilerek entity'niz üzerinde işlemler yapmanızı sağlar.

IPreUpdateEventListener; ise bir update listener'ı dır ve içerisinde implement edebildiğimiz OnPreUpdate  metodu çağrılır update query'sinin execution'dan önce call edilerek yine entity üzerinde değişiklikler yapabilmemizi sağlar.

Örnek olarak bir BaseModel'miz olsun ve projemizde bulunan her bir entity için tablolarda ortak bulunan alanları bu class içerisinde tanımlayabiliriz.

    public abstract class BaseModel
    {
        public virtual Guid Id { get; set; }
        public virtual DateTime? CreatedDate { get; set; }
        public virtual DateTime? ModifiedDate { get; set; }
    }

Db de bulunan tablolarımıza karşılık gelen entity'lerimiz ise yukarıda tanımladığımız BaseModel class'ından inherit olacaklar. Örnek olarak Customer adında aşağıdaki gibi bir entity tanımlayalım.

    public class Customer : BaseModel
    {
        public virtual string FirstName { get; set; }
        public virtual string LastName { get; set; }
        public virtual string Email { get; set; }
    }

Şimdi ise CustomerMap class'ını oluşturacaz ancak öncesinde BaseModel içerisinde bulunan proeprty'ler için BaseMapping adında bir class tanımlayalım. Customer ve diğer db modellerimizde bu BaseMapping'i kullanarak map işlemlerini yapacağız. Bunu yapmamızdaki amaç her bir entity için ayrı ayrı gidip BaseModel içerisinde bulunan alanların map'ini yapmamak. 

    public class BaseMapping<T> : ClassMap<T> where T : BaseModel
    {
        public BaseMapping()
        {
            Id(x => x.Id);
            Map(x => x.CreatedDate);
            Map(x => x.ModifiedDate);
        }
    }

 

Artık BaseMapping 'i kullanarak CustomerMap class'ını oluşturabiliriz. 

    public class CustomerMap : BaseMapping<Customer>
    {
        public CustomerMap()
        {
            Map(x => x.FirstName);
            Map(x => x.LastName);
            Map(x => x.Email);
        }
    }

Sırada Listener class'ını oluşturmak var. Aşağıda NHInsertUpdateListener adında bir class tanımlayalım. Yazının başında bahsettiğimiz her tablomuzda bulunan CreatedDate ve ModifiedDate tarih alanlarını NHInsertUpdateListener içerisinde set edeceğiz.

    public class NHInsertUpdateListener : IPreInsertEventListener, IPreUpdateEventListener
    {
        public bool OnPreUpdate(PreUpdateEvent @event)
        {
            var audit = @event.Entity as BaseModel;
            if (audit == null)
                return false;

            var time = DateTime.Now;

            Set(@event.Persister, @event.State, "CreatedDate", time);

            audit.CreatedDate = time;

            return false;
        }

        public bool OnPreInsert(PreInsertEvent @event)
        {
            var audit = @event.Entity as BaseModel;
            if (audit == null)
                return false;


            var time = DateTime.Now;

            Set(@event.Persister, @event.State, "ModifiedDate", time);

            audit.ModifiedDate = time;

            return false;
        }

        private void Set(IEntityPersister persister, object[] state, string propertyName, object value)
        {
            var index = Array.IndexOf(persister.PropertyNames, propertyName);
            if (index == -1)
                return;
            state[index] = value;
        }
    }

Artık son adım olarak FluentNHibernate'i ayağa kaldırmak var. Nh configuration'ı aşağıdaki gibi tanımlayabiliriz.

Fluently.Configure()
               .Database(MsSqlConfiguration.MsSql2012.ConnectionString(c => c.FromAppSetting("dbConnectionString")).ShowSql())
               .Mappings(m => m.FluentMappings.AddFromAssemblyOf<CustomerMap>())
               .ExposeConfiguration(cfg => new SchemaUpdate(cfg).Execute(false, true))
               .ExposeConfiguration(cfg =>
               {
                   cfg.SetProperty(
                      NHibernate.Cfg.Environment.CurrentSessionContextClass,
                       "web");
                   cfg.AppendListeners(ListenerType.PreUpdate, new IPreUpdateEventListener[] { new NHInsertUpdateListener() });
               })
               .BuildSessionFactory();

 

Sırasıyla yazmak gerekirse neler yaptık;

  1. Vs da bir tane proje oluşturduk. (Console veya Api),
  2. FluentNHibernate paketini nuget'ten indirip kurduk,
  3. Bir db miz olduğunu ve connection string bilgisinin web config'de tanımlı olduğunu varsaydık,
  4. Tablolarda ortak kullanılan propert'leri BaseModel adında ki class da topladık,
  5. Daha sonra BaseMapping adında bir mapping tanımlaması yaparak entity içerisindeki property'leri map ettik,
  6. CustomerMap class'ını oluşturarak mapping işlemini tanımladık,
  7. NHInsertUpdateListener'ı oluşturduk ve CreatedDate - ModifiedDate alanları için değerleri set ettik.
  8. Fluent Nhibernate konfigurasyonunu oluşturduk.

 Listener'lar biraz az bilinen bir özellik gibi görünse de oldukça faydalıdırlar. Örnekte olduğu gibi benzer case'lerde kullanarak bizleri satırlarca tekrar eden kodlardan uzaklaştırır.

StructureMap Nedir ? WebApi ile Kullanımı

Daha önceki IoC container yazılarında Ninject ve Windsor 'dan bahsetmiştik. Bu yazımızda ise 2016 benchmark'larına göre en hızlı IoC container olduğu söylenen StructureMap'i WebApi üzerinde örnek proje ile inceleyeceğiz. 

StructureMap ilk release'i .Net framework 1.1 için 2004 yılında çıkmış ve 12 yıldır hayatımızda olan en eski IoC/DI tool'u dur. Uygulama genelindeki instance yönetiminden sorumlu olup bağımlılıkları enjecte edebilmemizi sağlar.

Yazımızda StructureMap kullanarak basit bir infrastructure tasarlamaya çalışacağız. 

İlk olarak Vs'da bir Web Api projesi oluşturalım.

Sonrasında projemize nuget üzerinden StructureMap.WebApi2 paketini install edelim.

Kurulum işlemi tamamlandıktan sonra solution'da DependencyResolution adında auto generate olan bir klasör ve hem bu klasör içerisinde hemde App_Start klasörü içerisinde StructureMap konfigurasyonlarını yapabilmemizi sağlayacak olan class'ları göreceğiz. Şimdilik bu klasörü es geçelim projemizi hazır hale getirdikten sonra gerekli register işlemlerini yaparız. 

Örnek case'imiz şu şekilde olsun; UserController adında bir controller oluşturalım ve bu controller içerisinde tanımlı IUserService intercase'ini contructor injection yöntemi ile inject edelim. GetUserFullNames adında end-point ile geriye List of string dönelim.

    public class UserController : ApiController
    {
        private readonly IUserService _userService;
        public UserController(IUserService userService)
        {
            _userService = userService;
        }

        [HttpGet]
        public HttpResponseMessage GetUserFullNames()
        {
            var response = _userService.GetUserFullNames();

            return Request.CreateResponse(response);
        }
    }

Controller içerisinde kullandığımız IUserService interface'i ve onun implemantasyonunu aşağıdaki gibi oluşturalım.

public interface IUserService
{
    List<string> GetUserFullNames();
}
 
public class UserService : IUserService
{
    public List<string> GetUserFullNames()
    {
        return new List<string>
                              { "Olcay Şahan",
                                "Anderson Talisca",
                                "Oğuzhan Özyakup",
                                "Ricardo Quaresma",
                                "Cenk Tosun" };
    }
}

Yukarıda da söylediğimiz gibi UserController içerisindeki GetUserFullNames metodu HttpGet isteği alarak geriye UserService içerisinde bulunan GetUserFullNames metodunun return ettiği List of string'i dönecektir.

Örneğimiz hazır. Şimdi ise geriye son 2 adım kaldı.

1-) StructureMap ile ilgili container konfigurasyonlarını yapmak. DependencyResolution klasörü içerisinde bulunan DefaultRegistry adlı class'a gidip aşağıdaki gibi UserService'i container'a register edeceğiz.

    public class DefaultRegistry : Registry {
        #region Constructors and Destructors

        public DefaultRegistry() {
            Scan(
                scan => {
                    scan.TheCallingAssembly();
                    scan.WithDefaultConventions();
                });
            For<IUserService>().Use<UserService>();
        }

        #endregion
    }

2-) Son olarak App_Start/WebApiConfig.cs class'ına gidip DI container'ı start etmemiz gerekiyor. 

    public static class WebApiConfig
    {
        public static void Register(HttpConfiguration config)
        {
            // Web API configuration and services

            // Web API routes
            config.MapHttpAttributeRoutes();

            config.Routes.MapHttpRoute(
                name: "DefaultApi",
                routeTemplate: "api/{controller}/{id}",
                defaults: new { id = RouteParameter.Optional }
            );
            
            //structureMap start
            StructuremapWebApi.Start();
        }
    }

Hem örneğimiz hemde container ile ilgili konfigurasyonlarımız hazır. Artık projemizi run edip yazdığımız kodları test edebiliriz. Bunun için projemizi run edelim ve yazmış olduğumuz http-Get end-point'ini browser da çağıralım.

Yukarıda görüldüğü gibi UserController da bulunan end-point'e call yaptık ve bize container'da bulunan IUserService'ine ait olan implementasyonu resolve edip UserService içerisinde bulunan GetUserFullNames metodunu execute edip geriye user listesini return etti

 StructureMap yazımız şimdilik bu kadar. İlerleyen günlerde daha farklı DI Container yazılarına devam edeceğiz.

Optimistic Lock Nedir ? Pessimistic Lock Nedir ? Data concurrency

Db'si olan ve son kullanıcı tarafından CRUD işlemlerinin bolca yapılabildiği bir proje geliştiriyorsanız veri tutarlılığı sizin için oldukça önemli bir hal almak durumundadır. Kayıtlı olan veriyi son kullanıcıya ulaştırabilip en güncel veri üzerinden transaction'ları işleyebilmek ve stale yada dirty data olarak da adlandırılan bayat veriyi handle edip kullanıcının erişmesini engellemek oldukça önemlidir. 

Transactional operasyonlar Concurrency'yi sağlayabilmek adına genelde üzerinde işlem yapılan veriye lock işlemi uygulanarak gerçekleştirilirler. Bu lock işlemi için 2 farklı yaklaşım vardır. Pessimistik Lock ve Optimistic Lock.

Pessimistic Lock

O an işlem gerçekleşirken üzerinde çalışılan kayıt lock'lanır ki o anda başka birisi o kayıt üzerinde değişiklik yapmasın. Bu işlem session bazlı olur ve transaction başlarken açılan session sonlandırılıncaya veya rollback yapılıncaya kadar işlem yapılan row db de lock'lanır. Örnek olarak bir bankacılık uygulaması düşünün ve bir hesaba aynı anda hem para çekme hemde para yatırma işlemi geldi. İlk para yatırma işlemini yapan thread öncelikli düşündüğümüzde bu transaction'ı gerçekleştiren session o account'u işlem sonlanıncaya kadar lock'lar ve para çekme işlemini bekletir. Transaction sonlandıktan sonra diğer thread'in gerçekleştireceği para çekme işleminin session'nını açarak güncel veri üzerinden işlemlerin gerçekleşmesine olanak sağlar. Bunu yapmasındaki amaç güncel veri üzerinden transaction'ı geçirip oluşabilecek kayıpları engellemektir. Ancak pessimistic lock'ın deadlock'lara sebep olabileceğini de unutmayalım.

 

Optimistic Lock

Optimistic Lock ise, adından da anlaşılacağı üzre "iyimser" birden fazla işlemin birbirini etkilemeden gerçekleşeceğini ve kimsenin kimse üzerinde bir lock koymayacağını söyler. Diğer bir deyişle farklı thread'ler de aynı row üzerinde işlem yapılırken herhangi bir lock işlemi olmadan update edilmek istenen verinin bayat olup olmadığını o verinin kayıtlı olduğu tabloda yer alan versiyon numarası olarak da adlandırılan bir column'da bulunan değeri kontrol eder ve eğer versiyon eşleşmiyorsa yani veri bayat ise işlem geri çekilir.

Peki bayat(stale) data ne demek ?

Örnek üzerinden anlatacak olursak; bir internet sitesinde kayıtlı bulunan adres bilginizi güncellemek istiyorsunuz. Aynı anda 2 farklı bilgisayardan bilgileri güncelle sayfasını açtınız ve adresiniz o an "Samsun" olarak kayıtlı yani 2 ekranda da "Samsun" yazıyor. İlk bilgisayarda bulunan kişi adres bilgisini "Ankara" olarak değiştirdi ve güncelle butonuna basıp bilgiyi güncelledi.

İkinci ekranda bulunan kişi ise ekranda halen "Samsun" yazılı iken adres bilgisini "İstanbul" olarak değiştirdi ve güncelle butonuna basıp bilgiyi güncelledi. Ekranda yazan "Samsun" kaydı artık bizim için bayat bir kayıttır ve birinci kullanıcı değişikliği "Samsun" => "Ankara" yaptığını düşünürken ikinci kişi bu değişikliği "Samsun" => "İstanbul" yaptığını düşünüyor. Halbuki gerçekte olan ikinci kişi adres bilgisi ekranda "Ankara" iken => "İstanbul" olarak değiştirmiş oldu.

Ne oldu ? Pek de istemediğimiz bir case oluştu. İkinci kullanıcı Samsun olan kaydı İstanbul yaptığını düşünürken aslında Ankara olan kaydı İstanbul yaptı. Yani stale olan kaydı güncellemiş oldu.

Optimistic Lock ile ikinci kullanıcının stale olan veriyi update etmesine şu şekilde engel olabiliriz; Eğer Ado.Net kullanıyorsanız ve db de bulunan her bir row icin birer versiyon

numarasi vb gibi kaydinizvar ise query nizde where koşuluna o row için güncel olarak bulunan version bilgisini ekleyerek kontrol sağlayabiliriz veya Entity Framework yada NHibernate gibi ORM tool'larından birini kullanıyorsanız bu işlemi size bırakmadan güncellenmek istenen row'a ait versiyon numarasını select işleminde memory de tutuyor ve o veri için update transaction'ı execute edilirken bu versiyon numarası db de karşılaştırıyor. Eğer o versiyon numarası db de bulunan ile aynı ise versiyon numarasını 1 artırıp execution'a izin veriyor değilse hata fırlatıyor. Hata mesajı olarak kullanılan ORM türüne göre "The record you attempted to edit was modified by another user after you got the original value" gibi bir message return ediyor.

 

Hem optimistic hemde pessimistic lock konuları çok fazla önemsemediğimiz anlar olsa da oldukça önemli konulardır. Sonraki yazılarımızda Nhibernate veya Entity Framework kullanarak nasıl bir Optimistic Lock yapısı implement edebiliriz inceleyeceğiz.

Castle Windsor ile Exception Handling Interceptor (Dynamic Proxy)

Daha önceki IoC ve AOP yazılarında  çeşitli konulara değinerek örnek projeler üzerinde anlatımlarda bulunduk. Bu yazımızda da Castle Windsor'dan yararlanarak projelerimizde sıklıkla kullanacağımız bir ExceptionAspect veya Interceptor oluşturacağız. 

Interceptor'lar veya Dynamic Proxies Aspect Oriented Programming'in bir implementasyonudur. Bu bize oluşturduğu proxy ile metodu intercept ederek kendi kodlarımızın arasına inject etmemizi sağlar.

IL kodlarına baktığımızda aşağıdaki resimde olduğu gibi geliştirmiş olduğumuz kodları try catch finally blokları arasına alınır.

 

 

 

 

 

 

 

Daha önceki Castle Windsor Kullanarak Cache Interceptor Oluşturma yazısındaki örneğimiz üzerinden ilerleyelim. O yazıda il kodu alarak geriye ilçeleri dönen bir case üzerinden caching işlemi yapan aspect geliştirmiştik. Bu yazımızda da aynı örnek üzerinden ilerleyerek projemiz için bir ExceptionHandling aspect oluşturalım. 

İlk olarak ExceptionHandlingInterceptor adında aspect'imizi oluşturalım.

public class ExceptionHandlingInterceptor : IInterceptor
{
    public void Intercept(IInvocation invocation)
    {
        try
        {
            invocation.Proceed();      
        }
        catch (Exception ex)
        {
            LogManager.Log("Exception in : " + invocation.Method.Name + " method." + ex);

            invocation.ReturnValue = new BaseResponse { IsSuccess = false };
        }
    }
}

Yukarıdaki kodu incelediğimizde özetle şunu söylüyor ;

  • Invoke edilecek metodu try-catch-finally bloğu arasına al
  • Invoke edilen metot içerisinde hata alırsan catch bloğuna gir ve önce alınan hatayı log'a yaz
  • Sonrasında BaseResponse modelini initialize ederek client'a clear bir response model dön.

Tabi ki de çok daha farklı ayrıntıları log'a yazmanız gerekir ancak sample olduğundan şimdilik bunları yazalım.

İkinci adım olarak yazmış olduğumuz Exception aspect'ini container'a register etmek var.

Castle Windsor Kullanarak Cache Interceptor Oluşturma örneğinde ServiceInstaller class'ını olduğu gibi alıp ExceptionHandlingInterceptor'ını register işlemimizi yapalım.

public class ServiceInstaller : IWindsorInstaller
{
    public void Install(IWindsorContainer container, IConfigurationStore store)
    {
        container.Register(Component.For<CacheInterceptor>().LifestyleSingleton());
        container.Register(Component.For<ExceptionHandlingInterceptor>().LifestyleSingleton());

        container.Register(Component.For(typeof(ILocationService))
                 .ImplementedBy(typeof(LocationService))
                 .Interceptors(typeof(CacheInterceptor)
                 .Interceptors(typeof(ExceptionHandlingInterceptor))));
    }
}

Yukarıda ki register işlemi şunu söylüyor; "ILocationService interface'inin implementasyonu LocationService class'ı dır. Bu implementasyona ait 2 adet Interceptor var "CacheInterceptor" ve "ExceptionHandlingInterceptor" adında ve bu interceptor'lar LifeStyleSingleton olarak container'a register edilmişlerdir."

Uygulamamız hazır diyebiliriz. LocationService içerisinde bulunan GetDistrictsByCityCode adlı metodumuzda test etmek için aşağıdaki gibi kendimiz bir exception throw edelim.

public class LocationService : ILocationService
{
    public GetDistrictsByCityCodeResponse GetDistrictsByCityCode(int cityCode)
    {
        throw new NullReferenceException();
    }
}

Projemizi run edip ilgili endpoint'e istekte bulunduğumuzda exception fırlatılıp Interceptor içerisinde bulunan catch bloğuna düşecektir ve loglama işlemini yaptıktan sonra client'a BaseResponse modelini dönecektir.

 

Peki bunları yaparak ne sağladık ? 

  1. Exception Handling işini eski usül projede yüzlerce try-catch bloğu oluşturmak yerine AOP'in faydalarından yararlanarak tek bir yerden yönetebilir duruma getirdik.
  2. Çok daha reusable olan ve başka yerlerde de kullanabileceğimiz bir yapı tasarladık.
  3. Uygulama exception fırlattığında client'a saçma sapan StackTrace mesajı yerine her response da aldığı BaseResponse modelini dönerek response'larımızı daha tutarlı bir hale getirdik.