[Transactional Outbox] CDC 기반의 transaction log tailing 구현
[Transactional Outbox] 개요 에서는 Transactional Outbox pattern에 대한 이론적인 부분을 살펴보았다.
이번 포스팅에서는 실제로 CDC를 사용하여 transaction log tailing 방식을 구현하는 예제에 대해 살펴보도록 하겠다.
(전체 코드는 여기에서 확인할 수 있다)
개요
다음과 같은 아키텍처의 Transactional Outbox pattern을 구현한다.
- 동일한 transaction 내에서 서비스 데이터 업데이트와 도메인 이벤트 데이터 저장을 한다.
- CDC를 통해 DB에 도메인 이벤트 데이터가 추가되었음을 감지하고, '데이터 추가됨 이벤트'를 Kafka topic으로 발행한다.
- 2 에서의 '데이터 추가됨 이벤트' message를 가져온 다음, 이를 도메인 이벤트 형태로 가공한 후 대상 topic으로 발행한다.
인프라 구성
docker compose
transaction log tailing 구현에 있어 필요한 인프라를 docker compose를 통해 구성하도록 한다.
(설정 및 데이터를 편리하게 확인할 수 있도록 UI tool도 추가하였다)
- MySQL
- Kafka cluster
- kafka-01: 9092
- kafka-02: 9093
- kafka-03: 9094
- Kafka UI
- Debezium
- Debezium UI
services:
mysql:
image: mysql:8.0
ports:
- "3306:3306"
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: playground
networks:
- local-infra
# reference: https://velog.io/@jthugg/make-local-kafka-cluster-kraft-mode-feat-debezium-cdc
kafka-01:
image: apache/kafka:3.8.1
ports:
- "9092:9092"
volumes:
- ./docker-volume/kafka/secrets:/etc/kafka/secrets
- ./docker-volume/kafka/config:/mnt/shared/config
environment:
CLUSTER_ID: "event-broker"
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-01:29092,2@kafka-02:29093,3@kafka-03:29094"
KAFKA_LISTENERS: "PLAINTEXT://:19092,CONTROLLER://:29092,EXTERNAL://:9092"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-01:19092,EXTERNAL://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_PROCESS_ROLES: 'broker,controller'
networks:
- local-infra
kafka-02:
image: apache/kafka:3.8.1
ports:
- "9093:9093"
volumes:
- ./docker-volume/kafka/secrets:/etc/kafka/secrets
- ./docker-volume/kafka/config:/mnt/shared/config
environment:
CLUSTER_ID: "event-broker"
KAFKA_NODE_ID: 2
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-01:29092,2@kafka-02:29093,3@kafka-03:29094"
KAFKA_LISTENERS: "PLAINTEXT://:19093,CONTROLLER://:29093,EXTERNAL://:9093"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-02:19093,EXTERNAL://localhost:9093"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_PROCESS_ROLES: 'broker,controller'
networks:
- local-infra
kafka-03:
image: apache/kafka:3.8.1
ports:
- "9094:9094"
volumes:
- ./docker-volume/kafka/secrets:/etc/kafka/secrets
- ./docker-volume/kafka/config:/mnt/shared/config
environment:
CLUSTER_ID: "event-broker"
KAFKA_NODE_ID: 3
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-01:29092,2@kafka-02:29093,3@kafka-03:29094"
KAFKA_LISTENERS: "PLAINTEXT://:19094,CONTROLLER://:29094,EXTERNAL://:9094"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-03:19094,EXTERNAL://localhost:9094"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_PROCESS_ROLES: 'broker,controller'
networks:
- local-infra
kafka-ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka-01
- kafka-02
- kafka-03
ports:
- "9090:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-01:19092,kafka-02:19093,kafka-03:19094
networks:
- local-infra
debezium:
image: debezium/connect:3.0.0.Final
ports:
- "8083:8083"
depends_on:
- mysql
- kafka-01
- kafka-02
- kafka-03
environment:
- BOOTSTRAP_SERVERS=kafka-01:19092,kafka-02:19093,kafka-03:19094
- GROUP_ID=debezium-00
- CONFIG_STORAGE_TOPIC=DEBEZIUM_CONNECT_CONFIGS
- OFFSET_STORAGE_TOPIC=DEBEZIUM_CONNECT_OFFSETS
- STATUS_STORAGE_TOPIC=DEBEZIUM_CONNECT_STATUSES
- CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
networks:
- local-infra
debezium-ui:
image: debezium/debezium-ui:latest
container_name: debezium-ui
ports:
- "9091:8080"
depends_on:
- debezium
environment:
- KAFKA_CONNECT_URIS=http://debezium:8083
networks:
- local-infra
networks:
local-infra:
driver: bridge
(설정은 [Kafka] 로컬 환경 클러스터 구축기 feat. Debezium CDC를 참고하였다)
Debezium MySQL connector
CDC를 위해 Debezium을 통해 MySQL connector를 추가한다.
Debezium이 docker 네트워크 상으로 local-infra 라는 네트워크에 있고, 동일한 네트워크의 MySQL과 Kafka cluster에 연결하여야한다.
그렇기 때문에 database.hostname, schema.history.internal.kafka.bootstrap.servers는 docker compose 스크립트에 명시된 서비스 명을 사용하도록 한다.
POST localhost:8083/connectors
Content-Type: application/json
{
"name": "service-db-event-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "root",
"database.server.id": "123456",
"database.server.name": "service-db",
"database.include.list": "playground",
"table.include.list": "playground.events",
"topic.prefix": "cdc",
"schema.history.internal.kafka.bootstrap.servers": "kafka-01:19092,kafka-02:19093,kafka-03:19094",
"schema.history.internal.kafka.topic": "schema-history.playground.events"
}
}
예제에서는 MySQL의 'playground' db 내의 'events' 테이블에 대해 CDC를 적용하였다. (http 파일)
(property에 대한 설명은 document를 참고)
Kafka UI(http://localhost:9090)에서 topic을 확인해보면 CDC 설정 후에 관련 topic들이 추가된 것을 확인할 수 있다.
그리고 Debezium UI(http://localhost:9091)에서도 추가된 MySQL connector의 정보를 확인할 수 있다.
코드 구현
event outbox
서버 애플리케이션에서 이벤트 발행 시 DB에 event를 저장하는 event outbox를 구현한다.
table은 아래와 같으며 Kafka에 이벤트에 대한 message를 기준으로 table을 설계하였다.
CREATE TABLE events
(
id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT,
partition_key VARCHAR(200) NOT NULL,
topic VARCHAR(200) NOT NULL,
payload JSON NOT NULL,
created_at DATETIME(2) NOT NULL
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
이에 해당하는 Entity는 아래와 같다. (ORM framework으로는 Spring Data JDBC 사용하였다)
@Table(name = "events")
class Event {
@Id
var id: Long? = null
private set
@Column(value = "partition_key")
val partitionKey: String
@Column(value = "topic")
val topic: String
@Column(value = "payload")
val payload: String // actual column type is JSON
@CreatedDate
@Column(value = "created_at")
val createAt: LocalDateTime = LocalDateTime.now()
constructor(partitionKey: String, topic: String, payload: Any) {
this.partitionKey = partitionKey
this.topic = topic
this.payload = when (payload) {
is String -> payload
else -> payloadObjectMapper.writeValueAsString(payload)
}
}
companion object {
private val payloadObjectMapper = jacksonObjectMapper()
}
}
그리고 이 이벤트를 DB에 저장함으로써 이벤트를 발행하는 publisher는 아래와 같다.
@Component
class EventPublisher(
private val repository: EventRepository,
) {
fun publish(event: Event) {
this.repository.save(event)
}
}
interface EventRepository : CrudRepository<Event, Long>
이제 도메인 이벤트 발행이 필요한 곳에서 EventPublisher를 사용하면 된다.
@Component
class OrderEventPublisher(
private val eventPublisher: EventPublisher,
) {
fun orderPlaced(event: OrderPlacedEvent) {
this.eventPublisher.publish(
Event(
partitionKey = event.orderId.toString(),
topic = "order.placed",
payload = event,
)
)
}
}
data class OrderPlacedEvent(
val orderId: Long,
)
@Service
class OrderService(
private val orderRepository: OrderRepository,
private val orderEventPublisher: OrderEventPublisher,
) {
// Order와 Event는 동일한 Transactional으로 DB에 저장된다.
@Transactional
fun placeOrder(productId: String, quantity: Int): Order {
val order = this.orderRepository.save(Order(productId = productId, quantity = quantity))
this.orderEventPublisher.orderPlaced(OrderPlacedEvent(orderId = order.id!!))
return order
}
}
위 예제 코드를 통해 생성된 이벤트 DB 데이터는 아래와 같다.
CDC event 처리
MySQL connector이 정상적으로 생성되었으면 이벤트 데이터가 DB에 쌓였을 때 아래와 같이 CDC event message가 topic에 발행되었을 것이다.
이제 이 CDC event message를 listen 하여 해당되는 domain event 형태로 변환 후 대상 topic으로 message를 발행시켜보자.
(아래 도식에서 ③단계)
KafkaListener의 예시는 아래와 같다.
(이전에 발행된 CDC 이벤트를 가져오도록 spring.kafka.consumer.auto-offset-reset 설정을 'earliest'으로 하였다. 예제의 Spring Kafka 설정은 여기를 참고)
@Component
class CdcEventListener(
private val kafkaTemplate: KafkaTemplate<String, Any>,
) {
private val logger = KotlinLogging.logger {}
@KafkaListener(topics = ["cdc.playground.events"])
fun listenCdcEvent(cdcEventRecord: ConsumerRecord<String, String>) {
// CDC event
val rawCdcEventMessage = cdcEventRecord.value()
this.logger.debug { "Raw CDC event message: \n${rawCdcEventMessage}" }
val cdcEvent = objectMapper.readValue<Map<String, Any>>(rawCdcEventMessage)
val cdcPayload = cdcEvent["payload"] as Map<String, Any>
// if not 'insert data' event
if (cdcPayload["op"] != "c") {
return
}
// domain event
val eventMessage = cdcPayload["after"] as LinkedHashMap<String, Any>
this.logger.info { "Event message in CDC event: $eventMessage" }
val topic = eventMessage["topic"] as String
val partitionKey = eventMessage["partition_key"] as String
val payload = eventMessage["payload"] as String
// publish domain event message
this.kafkaTemplate.send(topic, partitionKey, payload)
}
companion object {
private val objectMapper = jacksonObjectMapper().apply {
configure(DeserializationFeature.USE_LONG_FOR_INTS, true)
}
}
}
서버를 실행 시키고 발행된 domain event message를 아래와 같이 확인할 수 있다.
Domain event listener
구현하고자하는 transaction log tailing의 아키텍처 구현은 완료되었다.
이제 발행된 domain event message에 대한 listener를 필요에 맞게 만들어서 사용하면 된다.
아래는 그 예제이다.
@Component
class OrderEventListener {
private val logger = KotlinLogging.logger {}
@KafkaListener(topics = ["order.placed"])
fun listenOrderPlacedEvent(eventRecord: ConsumerRecord<String, String>) {
val rawEventMessage = eventRecord.value()
val orderPlacedEvent = eventObjectMapper.readValue<OrderPlacedEvent>(rawEventMessage)
this.logger.info { "Order placed event: $orderPlacedEvent" }
// do something with OrderPlacedEvent
}
companion object {
private val eventObjectMapper = jacksonObjectMapper()
}
}
지금까지 CDC를 사용한 transaction log tailing 구현 예제를 살펴보았다.
보시다시피 인프라적으로 조금은 복잡한 형태의 아키텍처가 만들어졌다.
transaction log tailing을 사용하고자하는 경우는 정말로 필요한 기술이 맞는지, 운영할 수 있는 충분한 지식과 리소스가 있는지를 고려할 필요가 있다.
(많은 회사들이 transaction log tailing 보다는 polling publisher 방식을 사용하고 있다)