Java & Kotlin/Spring

Spring Kafka 간단하게 맛보기

devson 2021. 7. 11. 23:11

SpringBoot application에서 간단하게 Kafka Topic을 consume하고 message를 produce하는 방법에 대해 알아보자.

 

해당 코드는 여기 에서 확인 가능하다.

 

Kafka Topic 생성

간단하게 테스트하고 시각적으로 쉽게 확인할 수 있는 환경을 위해 GUI를 사용한다.

Client로 Conduktor를 사용한다.

Conduktor Client

my_topic 으로 Topic을 생성한다.

Topic 생성

my_topic Topic은 방금 생성되었기 때문에 아직 Consumer가 붙어있지 않다.

 

이제 Spring Kafka를 통해서 Kafka Topic을 consume하고 message를 produce하는 법에 대해 알아보자.

 

Spring Kafka를 사용하여 Consumer 설정

SpringBoot application을 생성한다.

(Web은 추후 Producer 테스트 용으로 추가한다)

 

Consumer 설정을 한다.

이 때 Consumer Group ID는 my_consumer로 하겠다.

(application.properties로도 설정 가능)

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory

@EnableKafka
@Configuration
class KafkaConfig {
    // Consumer
    @Bean
    fun consumerFactory(): ConsumerFactory<String, String> {
        return DefaultKafkaConsumerFactory(
            mapOf(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092", // Broker 설정 (separator ',')
                ConsumerConfig.GROUP_ID_CONFIG to "my_consumer", // Consumer Group 설정
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
            )
        )
    }

    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
        val kafkaListenerContainerFactory = ConcurrentKafkaListenerContainerFactory<String, String>()
        kafkaListenerContainerFactory.consumerFactory = this.consumerFactory()
        return kafkaListenerContainerFactory
    }
}

 

설정 후 KafkaListener를 통해 my_topic Topic message를 받아 간단하게 처리하는 기능을 추가한다.

import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component

@Component
class MyConsumer {
    @KafkaListener(topics = ["my_topic"])
    fun helloTopicHandler(message: String){
        println("Kafka message: $message")
    }
}

 

Application을 실행하면 다음과 같이 my_topic Topic 구독에 대한 log가 뜨고

Kafka에 Consumer가 추가된 것도 확인할 수 있다.

 

Conduktor를 통해 my_topic Topic에 message를 하나 produce 해보자.

그러면 KafkaListener로 처리했던 내용이 console에 나오는 것을 볼 수 있다.

 

Spring Kafka를 사용하여 Producer 설정

Consumer를 설정했던 KafkaConfig 에 추가로 Producer 설정을 한다.

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.*

@EnableKafka
@Configuration
class KafkaConfig {
    // Consumer
    @Bean
    fun consumerFactory(): ConsumerFactory<String, String> {
        return DefaultKafkaConsumerFactory(
            mapOf(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092", // Broker 설정 (separator ',')
                ConsumerConfig.GROUP_ID_CONFIG to "my_consumer", // Consumer Group 설정
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
            )
        )
    }

    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
        val kafkaListenerContainerFactory = ConcurrentKafkaListenerContainerFactory<String, String>()
        kafkaListenerContainerFactory.consumerFactory = this.consumerFactory()
        return kafkaListenerContainerFactory
    }

    // Producer
    @Bean
    fun producerFactory():ProducerFactory<String, String> {
        return DefaultKafkaProducerFactory(
            mapOf(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092", // Broker 설정 (separator ',')
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
            )
        )
    }

    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, String> {
        return KafkaTemplate(this.producerFactory())
    }
}

그리고 HTTP 요청을 받아 my_topic에 message를 produce하는 API를 하나 만든다.

@RestController
class MyProducer(
    private val kafkaTemplate: KafkaTemplate<String, String>
) {
    @GetMapping
    fun produceHelloTopic(@RequestParam("message") message: String) {
        this.kafkaTemplate.send("my_topic", message)
    }
}

 

Application을 재기동시키고 요청을 보내보자.

$ curl http://localhost:8080?message=test_producing_message
$ curl http://localhost:8080?message=test_second_message

 

그러면 기존에 KafkaListener로 설정한 Consumer를 통해 message가 출력되는 것을 확인할 수 있다.