ETC/System Design

[Transactional Outbox] CDC 기반의 transaction log tailing 구현

devson 2024. 11. 25. 17:50

[Transactional Outbox] 개요 에서는 Transactional Outbox pattern에 대한 이론적인 부분을 살펴보았다.

이번 포스팅에서는 실제로 CDC를 사용하여 transaction log tailing 방식을 구현하는 예제에 대해 살펴보도록 하겠다.

 

(전체 코드는 여기에서 확인할 수 있다)

 

개요

다음과 같은 아키텍처의 Transactional Outbox pattern을 구현한다.

 

  1. 동일한 transaction 내에서 서비스 데이터 업데이트와 도메인 이벤트 데이터 저장을 한다.
  2. CDC를 통해 DB에 도메인 이벤트 데이터가 추가되었음을 감지하고, '데이터 추가됨 이벤트'를 Kafka topic으로 발행한다.
  3. 2 에서의 '데이터 추가됨 이벤트' message를 가져온 다음, 이를 도메인 이벤트 형태로 가공한 후 대상 topic으로 발행한다.

 

인프라 구성

docker compose

transaction log tailing 구현에 있어 필요한 인프라를 docker compose를 통해 구성하도록 한다.

(설정 및 데이터를 편리하게 확인할 수 있도록 UI tool도 추가하였다)

  • MySQL
  • Kafka cluster
    • kafka-01: 9092
    • kafka-02: 9093
    • kafka-03: 9094
  • Kafka UI
  • Debezium
  • Debezium UI
services:
  mysql:
    image: mysql:8.0
    ports:
      - "3306:3306"
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: playground
    networks:
      - local-infra

  # reference: https://velog.io/@jthugg/make-local-kafka-cluster-kraft-mode-feat-debezium-cdc
  kafka-01:
    image: apache/kafka:3.8.1
    ports:
      - "9092:9092"
    volumes:
      - ./docker-volume/kafka/secrets:/etc/kafka/secrets
      - ./docker-volume/kafka/config:/mnt/shared/config
    environment:
      CLUSTER_ID: "event-broker"
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-01:29092,2@kafka-02:29093,3@kafka-03:29094"
      KAFKA_LISTENERS: "PLAINTEXT://:19092,CONTROLLER://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-01:19092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT'
      KAFKA_PROCESS_ROLES: 'broker,controller'
    networks:
      - local-infra

  kafka-02:
    image: apache/kafka:3.8.1
    ports:
      - "9093:9093"
    volumes:
      - ./docker-volume/kafka/secrets:/etc/kafka/secrets
      - ./docker-volume/kafka/config:/mnt/shared/config
    environment:
      CLUSTER_ID: "event-broker"
      KAFKA_NODE_ID: 2
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-01:29092,2@kafka-02:29093,3@kafka-03:29094"
      KAFKA_LISTENERS: "PLAINTEXT://:19093,CONTROLLER://:29093,EXTERNAL://:9093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-02:19093,EXTERNAL://localhost:9093"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT'
      KAFKA_PROCESS_ROLES: 'broker,controller'
    networks:
      - local-infra

  kafka-03:
    image: apache/kafka:3.8.1
    ports:
      - "9094:9094"
    volumes:
      - ./docker-volume/kafka/secrets:/etc/kafka/secrets
      - ./docker-volume/kafka/config:/mnt/shared/config
    environment:
      CLUSTER_ID: "event-broker"
      KAFKA_NODE_ID: 3
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-01:29092,2@kafka-02:29093,3@kafka-03:29094"
      KAFKA_LISTENERS: "PLAINTEXT://:19094,CONTROLLER://:29094,EXTERNAL://:9094"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-03:19094,EXTERNAL://localhost:9094"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT'
      KAFKA_PROCESS_ROLES: 'broker,controller'
    networks:
      - local-infra

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    depends_on:
      - kafka-01
      - kafka-02
      - kafka-03
    ports:
      - "9090:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-01:19092,kafka-02:19093,kafka-03:19094
    networks:
      - local-infra

  debezium:
    image: debezium/connect:3.0.0.Final
    ports:
      - "8083:8083"
    depends_on:
      - mysql
      - kafka-01
      - kafka-02
      - kafka-03
    environment:
      - BOOTSTRAP_SERVERS=kafka-01:19092,kafka-02:19093,kafka-03:19094
      - GROUP_ID=debezium-00
      - CONFIG_STORAGE_TOPIC=DEBEZIUM_CONNECT_CONFIGS
      - OFFSET_STORAGE_TOPIC=DEBEZIUM_CONNECT_OFFSETS
      - STATUS_STORAGE_TOPIC=DEBEZIUM_CONNECT_STATUSES
      - CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
      - CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
    networks:
      - local-infra

  debezium-ui:
    image: debezium/debezium-ui:latest
    container_name: debezium-ui
    ports:
      - "9091:8080"
    depends_on:
      - debezium
    environment:
      - KAFKA_CONNECT_URIS=http://debezium:8083
    networks:
      - local-infra

networks:
  local-infra:
    driver: bridge

 

(설정은 [Kafka] 로컬 환경 클러스터 구축기 feat. Debezium CDC를 참고하였다)

 

Debezium MySQL connector

CDC를 위해 Debezium을 통해 MySQL connector를 추가한다.

 

Debezium이 docker 네트워크 상으로 local-infra 라는 네트워크에 있고, 동일한 네트워크의 MySQL과 Kafka cluster에 연결하여야한다.

그렇기 때문에 database.hostname, schema.history.internal.kafka.bootstrap.servers는 docker compose 스크립트에 명시된 서비스 명을 사용하도록 한다.

POST localhost:8083/connectors
Content-Type: application/json

{
  "name": "service-db-event-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",

    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "root",

    "database.server.id": "123456",
    "database.server.name": "service-db",
    "database.include.list": "playground",
    "table.include.list": "playground.events",
    "topic.prefix": "cdc",
    "schema.history.internal.kafka.bootstrap.servers": "kafka-01:19092,kafka-02:19093,kafka-03:19094",
    "schema.history.internal.kafka.topic": "schema-history.playground.events"
  }
}

 

예제에서는 MySQL의 'playground' db 내의 'events' 테이블에 대해 CDC를 적용하였다. (http 파일)

(property에 대한 설명은 document를 참고)

 

Kafka UI(http://localhost:9090)에서 topic을 확인해보면 CDC 설정 후에 관련 topic들이 추가된 것을 확인할 수 있다.

connector 추가 전
connector 추가 후

 

그리고 Debezium UI(http://localhost:9091)에서도 추가된 MySQL connector의 정보를 확인할 수 있다.

 

코드 구현

event outbox

서버 애플리케이션에서 이벤트 발행 시 DB에 event를 저장하는 event outbox를 구현한다.

 

table은 아래와 같으며 Kafka에 이벤트에 대한 message를 기준으로 table을 설계하였다.

CREATE TABLE events
(
    id            BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT,
    partition_key VARCHAR(200) NOT NULL,
    topic         VARCHAR(200) NOT NULL,
    payload       JSON         NOT NULL,
    created_at    DATETIME(2)  NOT NULL
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

 

이에 해당하는 Entity는 아래와 같다. (ORM framework으로는 Spring Data JDBC 사용하였다)

@Table(name = "events")
class Event {
    @Id
    var id: Long? = null
        private set

    @Column(value = "partition_key")
    val partitionKey: String

    @Column(value = "topic")
    val topic: String

    @Column(value = "payload")
    val payload: String // actual column type is JSON

    @CreatedDate
    @Column(value = "created_at")
    val createAt: LocalDateTime = LocalDateTime.now()

    constructor(partitionKey: String, topic: String, payload: Any) {
        this.partitionKey = partitionKey
        this.topic = topic
        this.payload = when (payload) {
            is String -> payload
            else -> payloadObjectMapper.writeValueAsString(payload)
        }
    }

    companion object {
        private val payloadObjectMapper = jacksonObjectMapper()
    }
}

 

그리고 이 이벤트를 DB에 저장함으로써 이벤트를 발행하는 publisher는 아래와 같다.

@Component
class EventPublisher(
    private val repository: EventRepository,
) {
    fun publish(event: Event) {
        this.repository.save(event)
    }
}

interface EventRepository : CrudRepository<Event, Long>

 

이제 도메인 이벤트 발행이 필요한 곳에서 EventPublisher를 사용하면 된다.

@Component
class OrderEventPublisher(
    private val eventPublisher: EventPublisher,
) {
    fun orderPlaced(event: OrderPlacedEvent) {
        this.eventPublisher.publish(
            Event(
                partitionKey = event.orderId.toString(),
                topic = "order.placed",
                payload = event,
            )
        )
    }
}

data class OrderPlacedEvent(
    val orderId: Long,
)

 

@Service
class OrderService(
    private val orderRepository: OrderRepository,
    private val orderEventPublisher: OrderEventPublisher,
) {

    // Order와 Event는 동일한 Transactional으로 DB에 저장된다.
    @Transactional
    fun placeOrder(productId: String, quantity: Int): Order {
        val order = this.orderRepository.save(Order(productId = productId, quantity = quantity))
        this.orderEventPublisher.orderPlaced(OrderPlacedEvent(orderId = order.id!!))
        return order
    }
}

 

 

위 예제 코드를 통해 생성된 이벤트 DB 데이터는 아래와 같다.

 

CDC event 처리

MySQL connector이 정상적으로 생성되었으면 이벤트 데이터가 DB에 쌓였을 때 아래와 같이 CDC event message가 topic에 발행되었을 것이다.

 

이제 이 CDC event message를 listen 하여 해당되는 domain event 형태로 변환 후 대상 topic으로 message를 발행시켜보자.

(아래 도식에서 ③단계)

 

KafkaListener의 예시는 아래와 같다.

(이전에 발행된 CDC 이벤트를 가져오도록 spring.kafka.consumer.auto-offset-reset 설정을 'earliest'으로 하였다. 예제의 Spring Kafka 설정은 여기를 참고) 

@Component
class CdcEventListener(
    private val kafkaTemplate: KafkaTemplate<String, Any>,
) {
    private val logger = KotlinLogging.logger {}

    @KafkaListener(topics = ["cdc.playground.events"])
    fun listenCdcEvent(cdcEventRecord: ConsumerRecord<String, String>) {
        // CDC event
        val rawCdcEventMessage = cdcEventRecord.value()
        this.logger.debug { "Raw CDC event message: \n${rawCdcEventMessage}" }
        val cdcEvent = objectMapper.readValue<Map<String, Any>>(rawCdcEventMessage)
        val cdcPayload = cdcEvent["payload"] as Map<String, Any>
        // if not 'insert data' event
        if (cdcPayload["op"] != "c") {
            return
        }

        // domain event
        val eventMessage = cdcPayload["after"] as LinkedHashMap<String, Any>
        this.logger.info { "Event message in CDC event: $eventMessage" }
        val topic = eventMessage["topic"] as String
        val partitionKey = eventMessage["partition_key"] as String
        val payload = eventMessage["payload"] as String

        // publish domain event message
        this.kafkaTemplate.send(topic, partitionKey, payload)
    }

    companion object {
        private val objectMapper = jacksonObjectMapper().apply {
            configure(DeserializationFeature.USE_LONG_FOR_INTS, true)
        }
    }
}

 

서버를 실행 시키고 발행된 domain event message를 아래와 같이 확인할 수 있다.

 

Domain event listener

구현하고자하는 transaction log tailing의 아키텍처 구현은 완료되었다.

이제 발행된 domain event message에 대한 listener를 필요에 맞게 만들어서 사용하면 된다.

 

아래는 그 예제이다.

@Component
class OrderEventListener {
    private val logger = KotlinLogging.logger {}

    @KafkaListener(topics = ["order.placed"])
    fun listenOrderPlacedEvent(eventRecord: ConsumerRecord<String, String>) {
        val rawEventMessage = eventRecord.value()
        val orderPlacedEvent = eventObjectMapper.readValue<OrderPlacedEvent>(rawEventMessage)
        this.logger.info { "Order placed event: $orderPlacedEvent" }

        // do something with OrderPlacedEvent
    }

    companion object {
        private val eventObjectMapper = jacksonObjectMapper()
    }
}

 


 

지금까지 CDC를 사용한 transaction log tailing 구현 예제를 살펴보았다.

보시다시피 인프라적으로 조금은 복잡한 형태의 아키텍처가 만들어졌다.

transaction log tailing을 사용하고자하는 경우는 정말로 필요한 기술이 맞는지, 운영할 수 있는 충분한 지식과 리소스가 있는지를 고려할 필요가 있다.

(많은 회사들이 transaction log tailing 보다는 polling publisher 방식을 사용하고 있다)