본문 바로가기
개발/kafka

[kafka] auto commit 은 어떻게 동작할까?

by 상용최 2024. 12. 7.
반응형

kafka 에서는 auto commit 이라는 기능을 지원한다.
auto commit 이란 consumer 에서 offset 을 자동으로 커밋하는 기능을 뜻한다.

그렇다면 offset 은 무엇일까? auto commit 이 어떤 기능인지 이해를 하려면 offset 을 먼저 알아야 한다.

Offset 이란?

offset 은 kafka 에서 데이터를 관리하기 위한 기능으로 카프카에 메시지를 추가할 때 토픽의 특정 파티션에 추가되게된다.

각 메시지는 토픽에 순차적으로 저장되며 그 순서에 따라 가지는 번호를 offset 이라고 한다.

* 메시지가 offset 을 가지는것은 아니다.

Auto Commit

다시 auto commit 으로 돌아와서 얘기를 해보고자 한다.

카프카에 메시지에 추가될 때 파티션의 메시지별로 고유한 offset 이 부여되게 된다는것까지 알아보았다.
그러면 이제 Consumer 입장을 생각해보도록 하자.

Consumer 는 메시지를 가져와서 처리하는 역할을 가진다.
데이터를 처리하기 위해서는 "어디까지" 처리했는지와 "어디서부터" 처리를 해야하는지를 알아야한다.

Consumer 는 이것을 알기위하여 offset 을 활용한다.

예를들어 0번 offset 부터 5개의 데이터를 줘! 라고 요청할 수 있다. 그리고 모두 처리한 후 5번까지 처리했어 라고 알리게된다.

이때 어디까지 처리했는지 알리는 작업을 commit 이라고 하며, 이를 자동으로 하는것을 auto commit 기능이라고 한다.

 

auto commit 은 2가지 옵션에 영향을 받는다.

  • auto.commit.enable : true 일때 동작한다.
  • auto.commit.interval.ms : offset 이 저장되는 주기를 뜻하며, 기본값은 5초이다.

Interval.ms 만 지나면 컨슈머는 자동으로 오프셋정보를 커밋할까?

처음 구독을 시작하여 consume 을 시작하면 아래와같은 정보를 가지고있다.

 

그렇다면 아래와 같이 데이터를 가져온 후 무한정 대기를 하게되면 어떤일이 발생할까?
Interval.ms 를 500ms 로 세팅했으니 바로 오프셋을 커밋할까?

테스트를 진행해보자

object KafkaConsumerFactory {
    private val props = Properties().apply {
        put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
        put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group")
        put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
        put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
        put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
        put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
        put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "500")
    }

    fun create(): KafkaConsumer<String, String> {
        return KafkaConsumer<String, String>(props)
    }
}

fun main() {
    val consumer = KafkaConsumerFactory.create()
    consumer.subscribe(listOf("test-topic"))

    consumer.use { consumer ->
        while (true) {
            val records = consumer.poll(Duration.ofMillis(500))

            for (record in records) {
                println(record)
            }
            Thread.sleep(Long.MAX_VALUE)
        }
    }
}

 

데이터를 정상적으로 가지고왔고 print 까지 했다. 하지만 아무리 기다려도 아래와같이 offset 정보가 업데이트 되지않는것을 확인할 수 있다.

 

 

 

그렇다면 아래와같이 코드를 조금 변경해보고 다시 실행해보도록 하자.

fun main() {
    val consumer = KafkaConsumerFactory.create()
    consumer.subscribe(listOf("test-topic"))

    consumer.use { consumer ->
        while (true) {
            val records = consumer.poll(Duration.ofMillis(500))
            process(records)
        }
    }
}

fun process(records: ConsumerRecords<String, String>?) {
    Thread {
        if (records != null) {
            for (record in records) {
                println(record)
            }
        }
        sleep(Long.MAX_VALUE)
    }.start()
}

 

이번에는 print 를 하고 오프셋정보를 commit 한것을 확인할 수 있다.

눈치가 빠른 사람이라면 약간의 차이이지만 무엇이 차이인지 알 수 있을것이다. 바로 poll() 메소드를 호출했는가? 이다. 

이전코드에서는 poll 작업을 수행한 후 Thread.sleep 을 수행하기때문에 poll 메소드 호출이 안되지만 수정한 코드에서는 새로운 스레드를 생성한 후 해당 스레드에게 작업을 위임한 후 다시 poll 메소드를 호출한다.

auto commit 이랑 poll 이랑 무슨 상관이 있는것일까?

auto commit 의 동작원리

auto commit 의 동작원리는 commit request 를 만드는 CommitRequestManager 를 보면 알 수 있다.

CommitRequestManager 의 maybeAutoCommitAsync 메소드를 보면 internal 기간이 지났고 poll() 이 호출될 때 요청을 전송한다고 되어있다.

즉, poll() 메소드가 호출되어야 offset commit 요청이 전송되는것이다.

 

poll 메소드가 호출되면 무슨일이 일어나는지 조금 더 자세하게 알아보도록 하자.

 

위의 이미지는 poll 메소드의 코드이다.
중간즈음을 보면 pollForFetches(timer) 라는 코드가 있다. 이 코드가 실제 데이터를 가져오는 부분이라고 유추가 된다.

이 메소드를 조금 더 파해쳐보도록 하겠다.

 

poolForFetches 메소드 안에 들어가보면 위쪽에 collectFetch 라는 메소드를 호출하는것을 확인할 수 있다.

이 메소드를 통해 데이터를 가져오는것으로 보인다.

다시 한뎁스 더 들어가보도록 하자.

collectFetch 메소드 내부로 들어오면 데이터를 반환하기전에 ConsumerNetworkThread 를 wakeup 한다는 주석이 달려있다.

ConsumerNetworkThread 가 무슨일을 하는지 알아보도록 하자

 

ConsumerNetworkThread 의 runOne() 메소드를 보면 아래와 같은 주석이 달려있다.

KafkaClient.poll 메소드를 통하여 request 를 전송한다고 한다.

 

runOnce 메소드 내부를 살펴보면 requestManager 들의 poll 메소드를 호출하는것으로 보여진다.

auto commit 의 동작원리를 설명하는 처음부분에서 CommitRequestManager 의 maybeAutoCommitAsync 메소드에서 전송한다고 하였다.

CommitRequestManager 는 RequestManager interface 의 구현체이고 CommitRequestManager 의 poll 메소드안에서 maybeAutoCommitAsync 메소드를 호출하는것을 확인할 수 있다.

 

이로써 우리는 kafka 의 auto commit 이 어떠한 방식으로 이루어지는지 알아보았다.

반응형

댓글