Debezium Kullanarak PostgreSql Db Change Data Capture (CDC) Nasıl Yapılır ?

Günümüz yazılım dünyasında her türlü verinin ne kadar önemli olduğunu hepimiz biliyoruzdur. Özellikle doğrudan end-user oriented çalışan sistemlerde sürekli olarak farklı noktalara veri akışı sağlandığını görürüz. Örneğin bir e-ticaret sitesinde payment işlemi yapıldığında aynı anda 3-4 farklı noktaya bu işlem ile ilgili verileri aktarmanız gerekiyor (fraud-erp-dataScience-customer etc.) ve bunu klasik çözüm olarak gece çalışan bir job ile vs değilde anlık 1-2 sn içerisinde iletmeniz gerekmekte. Biraz daha teknik olarak açıklamak gerekirse; payment db'nizde bulunan transaction tablonuzdaki bütün crud(create,read,update,delete) işlemlerinden haberdar olması gereken farklı ekipler var ve bunu bir şekilde çözmeniz beklenmekte. Eğer event-driven bir mimariniz var ise ilgili ekiplerin rmq'larına federation kurarak event'ler fırlatabilirsiniz ancak kod tarafında geliştirme yapmanız gereken bir çözüm ve şu an bunu istemiyoruz.

Bu tür problemlere çözüm olarak önerilen yöntemleri araştırdığımızda karşımıza bunlardan biri olan Change Data Capture(CDC) yaklaşımı çıkmakta. CDC db seviyesinde data-tracking yaparak çözümler üreten bir dizi design pattern modelidir.

Bu yazımızda PostgreSql veritabınında bulunan bir tabloyu Debezium ile track ederek bütün crud işlemlerini Kafka'ya birer message olarak publish edip sonrasında bir consumer ile bu message'ları consume eden bir yapı tasarlayacağız.

1Apache Kafka - Messaging topic oluşturarak database'de track edilecek kayıtları store etmek için kullanacağız.

2Kafka Connect - Apache Kafka ve diğer sistemler arasında ölçeklenebilir ve güvenilir veri akışını sağlamak için kullanılan bir tool'dur. Veritabanlarındaki kayıtları Kafka'ya ve kafka dışındaki başka bir source'a taşıyabilen connector'leri tanımlamak için kullanılır.

3Debezium - Debezium open source olarak geliştirilen distributed bir change data capture (CDC) platformudur. Database'de track edilen verilerin Kafka Connect Apı kullanılarak Kafka'da tanımlı topic'e aktarılmasını sağlar.

Uygulamamızı geliştirmeye başlayalım. Öncelikle hazırlamış olduğumuz docker-compose.yml dosyamız ile zookeeper, kafka, postgresql ve connector için gerekli image'leri çekip docker'da çalışır hale getirelim.

version: '3.1'
services:
    postgres:
        image: debezium/postgres
        environment:
          POSTGRES_PASSWORD: qwerty
          POSTGRES_USER: appuser
        volumes:
           - ./postgres:/data/postgres
        ports:
          - 6532:6532 
    zookeeper:
        image: confluentinc/cp-zookeeper
        ports:
          - "2181:2181"
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
    kafka:
        image: confluentinc/cp-kafka
        depends_on:
          - zookeeper
          - postgres
        ports:
          - "9092:9092"
        environment:
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
          KAFKA_LOG_CLEANER_DELETE_RETENTION_MS: 5000
          KAFKA_BROKER_ID: 1
          KAFKA_MIN_INSYNC_REPLICAS: 1
    connector:
        image: debezium/connect:latest
        ports:
          - "8083:8083"
        environment:
          GROUP_ID: 1
          CONFIG_STORAGE_TOPIC: my_connect_configs
          OFFSET_STORAGE_TOPIC: my_connect_offsets
          BOOTSTRAP_SERVERS: kafka:9092
        depends_on:
          - zookeeper
          - postgres
          - kafka

Dosyayı oluşturduktan sonra ilgili dizinde;

docker-compose up

komutunu çalıştırarak kurulumlara başlayalım. 

Bütün kurulumlar bittikten sonra farklı bir terminalde docker ps komutu ile aşağıdaki gibi container'larımızın ayağa kalktığını görebilmemiz gerekmekte.

PostgreSql  

İlk olarak postgres container'a girip appuser adında bir user tanımlayıp sonrasında Payment database'ini oluşturup Transaction tablomuzu yaratalım. 

docker exec -it df3f1b598557 bash

psql -h localhost -p 5432 -U appuser

CREATE DATABASE payment;
\c payment
CREATE TABLE transaction(id SERIAL PRIMARY KEY, amount int, customerId varchar(36));

insert into transaction(id, amount,customerId) values(51, 12,'37b920fd-ecdd-7172-693a-d7be6db9792c');

Bütün komutlarımızı başarılı bir şekilde çalıştırdığımızda appuser kullanıcısı ile login olup payment adında bir database ve transaction adında bir tablo yaratmış olduk. Örnek olarak bir tanede transaction insert ettik.

Debezium 

Şimdi transaction tablosunda gerçekleşen her değişikliği Kafka'ya taşımak istiyoruz ve bunun için bir connector oluşturmalıyız. Connector, verileri veritabanından (veya başka bir depolama sisteminden) Kafka cluster'a (yada tam tersi) taşımaktan sorumlu olan bir uygulamadır. Kafka conenctor'e aşina değilseniz buradan daha fazla bilgiye ulaşabilirsiniz. Burada PostgreSql CRUD işlemlerini Apache Kafka cluster'ına taşımak istiyoruz.

Debezium özetle, PostgreSql'den gelen tüm change-event'lerini okuyabilen ve bunları Kafka'da yayınlayabilen bir Kafka connector'üdür.

Debezium'un restApi'si bulunmakta ve bu api'yi kullanarak gerekli tanımlamalarımızı aşağıdaki gibi yapalım.

curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
{
 "name": "payment-connector",
 "config": {
 "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
 "tasks.max": "1",
 "database.hostname": "postgres",
 "database.port": "5432",
 "database.user": "appuser",
 "database.password": "qwerty",
 "database.dbname" : "payment",
 "database.server.name": "dbserver1",
 "database.whitelist": "payment",
 "database.history.kafka.bootstrap.servers": "kafka:9092",
 "database.history.kafka.topic": "schema-changes.payment"
 }
}'

Connector'ümüzü tanımladık ve db bilgilerini vererek hangi schema ve database'e ait event'leri track edeceğini belirttik. Eğer curl localhost:8083/connectors/payment-connector/status komutunu çalıştırırsanız connector'ün RUNNING state'de olduğunu görebilirsiniz.

Şimdi ise kafka container'ın içerisine gidip topiclerimizi listeleyelim.

docker exec -it b06d7ea41b03 bash

kafka-topics --zookeeper zookeeper:2181 --list

__confluent.support.metrics
__consumer_offsets
connect-status
dbserver1.public.transaction
my_connect_configs
my_connect_offsets

Görüldüğü üzre dbserver1.public.transaction topic'i oluşmuş. Transaction tablosunun bütün event'lerini connector aracılığı ile kafka'da ki bu topic'te store edilecek.

Şimdi aşağıdaki komutu kullanarak bir consumer ayağa kaldırıp topic'e gelen message'ları dinlemeye başlayalım.

kafka-console-consumer --bootstrap-server kafka:9092 --from-beginning --topic dbserver1.public.transaction --property print.key=true --property key.separator="-"

Sonrasında farklı bir terminalde tekrardan payment db'ye connect olup transaction tablosuna bir insert ve update işlemi yapalım ve bu işlemlerin kafka-consumer tarafından consume edildiğini görelim.

insert into transaction(id, amount,customerId) values(85, 87,'37b920fd-ecdd-7172-693a-d7be6db9792c');
update transaction set amount=77 where id=85

Payload'a bakacak olursak,

Insert işlemi için;   "op":"c" olarak iletilir.

{
   "payload":{
      "before":null,
      "after":{
         "id":85,
         "amount":87,
         "customerid":"37b920fd-ecdd-7172-693a-d7be6db9792c"
      },
      "source":{
         "version":"1.0.2.Final",
         "connector":"postgresql",
         "name":"dbserver1",
         "ts_ms":1583931003883,
         "snapshot":"false",
         "db":"payment",
         "schema":"public",
         "table":"transaction",
         "txId":568,
         "lsn":23936360,
         "xmin":null
      },
      "op":"c",
      "ts_ms":1583931003889
   }
}

Update işlemi için;   "op":"u" olarak iletilir.

{
   "payload":{
      "before":null,
      "after":{
         "id":85,
         "amount":77,
         "customerid":"37b920fd-ecdd-7172-693a-d7be6db9792c"
      },
      "source":{
         "version":"1.0.2.Final",
         "connector":"postgresql",
         "name":"dbserver1",
         "ts_ms":1583931065480,
         "snapshot":"false",
         "db":"payment",
         "schema":"public",
         "table":"transaction",
         "txId":569,
         "lsn":23936888,
         "xmin":null
      },
      "op":"u",
      "ts_ms":1583931065486
   }
}

Delete işlemi için;

delete transaction where id=85;

{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"int32",
            "optional":false,
            "field":"id"
         }
      ],
      "optional":false,
      "name":"dbserver1.public.transaction.Key"
   },
   "payload":{
      "id":85
   }
}"-null"

CRUD işlem tipine göre connector aracılığı ile kafka'da bulunan topic'e yukarıdaki gibi message'lar iletilir. 

Özetleyecek olursak; yazinin başındada bahsettiğim gibi bu işlemi yapabilmenin farklı teknikleri bulunmakta Debezium bunlardan sadece biri. CDC tekniğini kullanarak veritabanı değişikliklerimizi gerçek zamanlı olarak Kafka'ya push edebilir ve sonrasında kafka'da blunan bu mesajları bir consumer aracılığı ile ihtiyaç duyulan noktalara gönderebiliriz. Consumer tarafinda farkli dillerde kutuphaneler kullanarak kendi business’larinizi isletebilirsiniz. 

Source

Comments (3) -

  • Harika bir yazı olmuş, genelde ingilizce kaynaklar var çok faydalı oldu teşekkürler.
    • Degerli yorumun icin tesekkurler Smile
  • Akıcı, sade ve anlaşılır bir anlatım olmuş, elinize sağlık.

Add comment