티스토리 뷰
1. 올바른 카프카 컨슈머(KafkaConsumer) 설정 가이드와 내부 동작 분석
[ group.protocol ]
설정에 대한 설명
group.protocol은 consumer가 사용할 그룹 프로토콜을 지정하는 설정으로 현재 classic과 consumer 두 가지 값을 지원한다. 해당 값을 consumer로 설정하면, consumer 그룹 프로토콜이 사용되며 다른 값을 설정하거나 생략하면 classic 그룹 프로토콜이 사용된다.
classic 그룹 프로토콜은 Kafka 0.9 이후 기본 제공되어온 전통적인 Kafka consumer group 관리 방식이다. session.timeout.ms, heartbeat.interval.ms 등이 기존 방식 그대로 작동하며 consumer coordinator가 리밸런싱과 멤버 관리를 전담한다.
반면에 consumer 그룹 프토토콜은 KIP-848에서 제안된 새로운 프로토콜로, 컨슈머 리밸런싱을 위한 차세대 프로토콜이라고 볼 수 있다. 리밸런싱 프로토콜을 개선하여 안정성과 확장성을 높이고자 했으며 이는 Kafka 3.7.0에서 초기 도입되었으며, Kafka 4.0.0에서 정식으로 지원되었으며 향후 기본 프로토콜로 전환될 가능성이 높다.
아직까지는 consumer 그룹 프로토콜에 대한 사용이 대중화되지 않았으므로, classic 프로토콜을 사용해주도록 하자. 또한 아래에서 다루는 설정에 대한 내용 역시 classic 그룹 프로토콜을 기준으로 설명되었다.
group.protocol=CLASSIC
소스코드로 살펴보기
컨슈머의 그룹 프로토콜이 추가됨에 따라 카프카 클라이언트의 코드에도 큰 변경이 생겼다. 먼저 위의 두 가지 프로토콜에 따라 카프카 컨슈머의 구현을 다르게 생성해주는 ConsumerDelegateCreator와 각각의 구현체인 AsyncKafkaConsumer, ClassicKafkaConsumer가 생성되었다.
기존에 우리가 알고 있던 KafkaConsumer는 일종의 구현 위임을 위한 계층으로 추상화되었고, 실제 처리는 AsyncKafkaConsumer, ClassicKafkaConsumer로 위임(delegate) 된다. KafkaConsumer 클래스의 내부를 보면 이러한 부분을 확인할 수 있다.
[ enable.auto.commit와 auto.commit.interval.ms ]
설정에 대한 설명
카프카 컨슈머는 자동 커밋이라는 기능을 제공하는데, enable.auto.commit을 true로 설정하면 컨슈머의 offset을 백그라운드(비동기)로 주기적으로 커밋시킨다. 그리고 이때 어느 주기로 자동 커밋할지를 결정하는 설정이 바로 auto.commit.interval.ms이다.
기본적으로 자동 커밋을 사용하므로 중요한 것은 auto.commit.interval.ms이다. 다음의 그림에서 보이듯이 카프카 컨슈머는 해당 설정에 따라 컨슈머 groupId 그리고 토픽 및 파티션 정보, 파티션에서 다음에 소비할 레코드의 offset 등의 정보를 보내게 된다.
이때 해당 정보는 새로운 메시지의 소비 여부와 무관하게 항상 요청이 가기 때문에 처리량에 따른 트레이드 오프가 필요한 설정이라고 볼 수 있다. 해당 주기를 높이면 커밋 요청이 줄어들기 때문에 네트워크 호출을 감소시킴으로써 성능을 높일 수 있을 것이다. 하지만 장애가 발생하여 자동 커밋에 실패한 경우, 이전 오프셋을 재처리하게 되므로 메시지 중복 처리 가능성이 높아지는 것이다. 따라서 서비스 성향에 따라 해당 값을 적절하게 설정할 필요가 있는데, 통상적으로 1초 정도로 잡으면 적당하다.
enable.auto.commit=true
auto.commit.interval.ms=1000
소스코드로 살펴보기
그렇다면 autocommit이 일어나는 위치는 어디일까? ClassicKafkaConsumer 내부에는 기본적으로 poll 메서드 내부에서 브로커와의 통신이 일어난다. 그리고 poll 메서드 내부에서는 다음과 같이 metadata를 갱신해주는 부분이 존재함을 살펴볼 수 있다.
해당 메서드 내부에서는 coordinator.poll 메서드 내부에서 자동 커밋 처리가 이뤄지므로 coordinator.poll 메서드 내부로 이동해보도록 하자.
ConsumerCoordinatord의 poll 메서드 내부에는 다음과 같이 maybeAutoCommitOffsetsAsync이라는 부분이 존재한다.
그리고 해당 메서드 내부에서는 auto commit이 활성화된 경우 그리고 해당 옵션값의 주기가 지난 경우에만 자동 커밋 요청이 호출됨을 확인할 수 있다.
다음 설정으로 넘어가기 이전에 하나 흥미로운을 살펴볼 수 있는데, 오프셋 commit을 보내는 것은 실제 메시지를 fetch 해오기 이전에 수행된다는 것이다. 즉, 비동기적 auto-commit 처리 이후 fetch가 일어나는 것이다.
따라서 메시지를 처리하는 Listener에서 메시지를 처리했더라고 장애로 인해 다음의 poll 루프로 돌아와서 auto commit이 되지 않았다면 메시지가 중복 처리될 수 있다는 부분을 알아둘 필요가 있다.
[ session.timeout.ms, request.timeout.ms, heartbeat.interval.ms ]
설정에 대한 설명
카프카 컨슈머는 자신이 살아 있음을 브로커에게 알려주기 위해 주기적으로 하트비트(heartbeat)를 전송하는데, 이때 하트비트를 보내는 주기를 설정하는 것이 heartbeat.interval.ms이다.
session.timeout.ms 설정은 하트비트에 대한 타임아웃 시간이다. 브로커가 session.timeout.ms 기간이 지나도록 하트비트를 받지 못하면, 해당 클라이언트를 그룹에서 제거하고 리밸런스를 시작한다. 해당 값은 브로커에 설정된 허용 범위인 group.min.session.timeout.ms와 group.max.session.timeout.ms 사이에 있어야 한다.
request.timeout.ms은 클라이언트가 요청에 대한 응답을 기다리는 최대 시간이다. 응답이 해당 타임아웃 내에 도착하지 않으면 클라이언트는 필요에 따라 요청을 재전송하거나 재시도 횟수가 모두 소진된 경우에 요청을 실패 처리한다.
카프카 공식 문서에서는 heartbeat.interval.ms이 반드시 session.timeout.ms보다 작아야 하며, 일반적으로 heartbeat.interval.ms을 session.timeout.ms의 1/3 이하로 설정할 것을 권장한다. 이를 통해 네트워크 지연이나 일시적인 장애가 있어도 consumer가 쉽게 죽었다고 판단하지 않기 때문이다. 해당 설정들은 특별한 이유가 없으면 기본값을 사용해주도록 하자.
session.timeout.ms=10000
request.timeout.ms=30000
heartbeat.interval.ms=3000
하트비트는 백그라운드 스레드에 의해 전송된다. 따라서 카프카에서 레코드를 읽어오는 메인 스레드는 데드락이 걸렸는데, 백그라운드 스레드는 멀쩡히 하트비트를 전송하고 있을 수도 있다. 따라서 max.poll..interval.ms와 같은 설정들이 보완으로 존재한다.
소스코드로 살펴보기
3개의 설정 중에서 request.timeout.ms만 소스코드 수준에서 간력히 살펴보록 하자. 다른 설정들은 소스코드까지 살펴볼 필요는 없다.
카프카에서 실제로 브로커로부터 메시지를 가져오는 컴포넌트는 Fetcher이며 그 중에서 실제 네트워크 관련 호출은 ConsumerNetworkClient가 책임진다.
따라서 request.timeout.ms과 같이 실제 네트워크 요청과 관련된 설정은 해당 클래스로 전달되어 활용된다.
그리고 해당 클래스는 다음과 같이 ClassicKafkaConsumer 내부에서 Fetcher 객체를 생성할 때 전달 되는 것을 확인할 수 있다.
[ max.poll.interval.ms, max.poll.records ]
설정에 대한 설명
컨슈머는 주기적으로 poll() 메서드를 호출해서 메시지를 가져오는데, 이때 컨슈머가 poll을 오래도록 호출하지 않으면 비정상이라고 판단하고 이를 제외시킬 필요가 있다. 이를 판단할 기준값으로 사용되는 설정이 max.poll.interval.ms이며, 카프카는 해당 시간을 초과할 경우 컨슈머를 그룹에서 내보내고 파티션을 재할당하게 된다.
max.poll.records는 컨슈머가 poll()을 한 번 호출했을 때 최대 몇 개의 레코드를 반환할지를 지정한다. 이는 단순히 poll() 호출당 반환되는 레코드 수의 상한선일 뿐, 브로커로부터 데이터를 가져오는 동작(fetch)과 무관하다. 앞서 설명하였듯 실제 브로커와 통신을 담당하는 컴포넌트는 Fetcher이며, Fetcher는 한 번 fetch 할 때 정해진 바이트의 크기만큼 데이터를 가져오지, 레코드의 건 수대로 데이터를 가져오지 않는다. 그리고 Fetcher를 통해 가져온 데이터는 내부 버퍼에 캐시되며 컨슈머는 해당 캐시된 데이터를 사용한다. 따라서 max.poll.records 값을 3으로 설정한다는 것은 브로커로부터 3개의 레코드를 가져오는 것이 아니라, “브로커로부터 가져온 데이터를 한 번에 몇 개까지 읽을 것인가”에 해당한다.
그렇다면 해당 설정은 어떠한 의미를 가질까? 메시지 처리 동작은 컨슈머 poll → 리스너 처리 → 컨슈머 poll → 리스너 처리 구조를 반복한다. 이때 자동 커밋을 처리하고 Fetcher가 데이터를 브로커로부터 가져오는 부분 역시 poll 내부에서 진행됨을 위에서 살펴보았다. 따라서 max.poll.records을 3으로 설정하면 3개의 메시지가 모두 처리된 후에 다시 poll을 진행하게 된다. 즉, 다음 fetch 처리와 오프셋 커밋 등이 max.poll.records 간격으로 되는 것이라고 볼 수 있다.
max.poll.interval.ms은 별도의 변경 없이 기본값인 5분(300000)을 사용하면 적당하다.
max.poll.records의 경우 1보다 크게 설정하면 하나의 메시지가 처리가 길어질 때 오프셋 커밋 등이 지연되며, 이로 인해 리밸런싱이 일어나며 중복 처리 가능성이 높아진다. 따라서 서비스 안정성이 중요하다면 해당 값을 작게 설정할수록 좋다. 참고로 오늘날엔 메시지 처리 시에 I/O 작업이 빠질 수 없기 때문에 이로 인한 처리 지연 등의 상황을 고려하여 1로 설정해주는 것이 좋은데, 이는 물론 서비스의 특성에 따라 조정해줄 필요가 있다.
max.poll.interval.ms=300000
max.poll.records=1
결과적으로 Fetcher는 버퍼에 저장된 레코드가 없는 경우에만 브로커에 요청을 보내게 되므로, 해당 값을 작게 설정에도 성능에 큰 영향을 주지는 않는다. 오히려 이어서 살펴볼 fetch 관련 설정들이 성능에 큰 영향을 준다.
소스코드로 살펴보기
max.poll.interval.ms이 리밸런싱을 위한 타임아웃으로 설정되는 부분은 GroupRebalanceConfig에서 확인할 수 있다.
max.poll.records가 활용되는 부분은 먼저 ClassicKafkaConsumer 내부의 poll 메서드에서 fetch를 수행하는 부분에 있다.
pollForFetches 내부로 이동하면 다음과 같이 fetcher를 통해 데이터를 가져오려고 하는 부분을 볼 수 있다. 그리고 주석에도 데이터가 이미 존재하는 경우 이를 즉시 반환한다고 적혀 있다.
그리고 실제 fetch를 수행하는 FetchCollector에 도달하게 되는데, 다음과 같이 maxPollRecords 만큼 반복문을 반복하며 한 번에 데이터를 읽고 이를 리스너에게 전달하게 되는 것이다.
[ fetch.min.bytes, fetch.max.bytes, fetch.max.wait.ms ]
설정에 대한 설명
fetch.min.bytes은 브로커가 fetch 요청에 응답하기 위해 준비해야 하는 최소 데이터 크기(바이트 단위)이다. 만약 컨슈머가 fetch 요청을 받은 시점에 설정된 양만큼의 데이터가 아직 준비되지 않았다면, 브로커는 해당 데이터가 누적될 때까지 요청을 대기시킨다.
하지만 그렇다고 컨슈머가 무한정 대기할 수는 없기에 fetch.max.wait.ms이라는 설정을 두어, 해당 시간을 초과한 경우 제공할 데이터가 없어도 응답을 주도록 하고 있다. 따라서 브로커는 fetch.min.bytes 만큼의 데이터를 채우거나 fetch.max.wait.ms 만큼 대기 시간이 초과하면 응답을 주는 것이다. 반대로 fetch.max.bytes는 컨슈머가 fetch 요청을 보낼 때 브로커가 반환할 수 있는 최대 데이터의 크기이다.
fetch.min.bytes 값을 크게 할수록 브로커 입장에서 더 많은 데이터를 누적한 후에 응답하기 때문에 효율적인 일괄 처리(batch process)가 가능해지지만 응답 지연 시간이 늘어날 수 있다. 따라서 처리 지연을 줄이고 싶다면 fetch.min.bytes을 낮추면서 fetch.max.wait.ms를 줄이고, 처리 효율을 높이고 싶다면 fetch.min.bytes 높이면서 fetch.max.bytes를 높일 수 있을 것이다.
해당 설정의 경우 기본값을 이용하면서 필요에 따라 튜닝해줄 필요가 있다.
fetch.min.bytes=1
fetch.max.bytes=5242880
fetch.max.wait.ms=500
참고로 fetch.min.bytes가 1이라고 하여도 항상 1바이트가 차면 응답을 주는 것은 아니다. 이는 단지 응답을 위한 요건에 해당할 뿐이며, 실제 브로커가 컨슈머에게 전달되는 크기는 1을 훨씬 상회할 수 있다. 즉, 1바이트 이상의 데이터가 준비되면 즉시 응답할 뿐이며 한 번에 fetch 해오는 레코드의 수와도 무관하다.
실제로 테스트를 해보면 fetch.min.bytes가 1임에도 불구하고 브로커에 여러 메시지가 존재하는 경우에 fetch.max.bytes 이하 크기만큼 일괄 전달되는 것을 확인할 수 있다.
만약 컨슈머 랙이 존재하는 상황이라면 fetch.max.wait.ms 주기가 매우 짧을 것이다. 하지만 랙이 존재하지 않으며 데이터가 없는 경우에는 fetch.max.wait.ms 만큼의 주기로 fetch 요청을 브로커로 보내게 될 것이다. 만약 담당하는 카프카의 fetch 주기가 fetch.max.wait.ms를 초과해서 걸리는 상황이라면, 브로커에 문제가 없는지 주의깊게 살펴볼 필요가 있다.
참고로 fetch를 위한 정보들은 각각의 Fetch 요청에 포함되어 전송한다.
소스코드로 살펴보기
해당 부분은 소스코드 레벨까지 살펴볼 필요는 없을 것 같다.
[ auto.offset.reset ]
설정에 대한 설명
카프카에 초기 오프셋이 없거나, 현재 오프셋이 서버에 존재하지 않는 경우 어떻게 수행할 것인가를 결정하는 것이 auto.offset.reset 옵션이다. 해당 옵션은 다음과 같은 설정들을 가질 수 있다.
- earliest: 오프셋을 가장 이른 오프셋으로 자동 리셋
- latest: 오프셋을 가장 마지막 오프셋으로 자동 리셋
- by_duration:<duration>: 현재 시각으로부터 지정된 <duration> 만큼 과거로 계산된 위치로 오프셋 자동 리셋함(ISO8601 형식만 지원하며 음수는 허용되지 않음)
- none: 컨슈머 그룹에 이전 오프셋이 없을 경우 예외(Exception)를 발생시킴
- 기타 값: 컨슈머에게 예외 발생
해당 설정의 경우 매우 중요하기 때문에, 서비스의 상황에 맞게 올바르게 설정해줄 필요가 있다. 보통 earliset와 latest 만을 사용하기 때문에 두 값을 사용해야 하는 케이스를 위주로 살펴보도록 하자.
새로운 토픽이며 아직 발행된 메시지가 없어서 오프셋이 존재하지 않는 경우에는 earliest, latest 어느 것을 사용해도 영향이 없다. 하지만 대부분의 경우 이미 발행중인 토픽에 연동을 하게 될텐데, 이때는 반드시 latest로 설정해주어야 하며 그렇지 않고 earliest로 설정하면 이미 발행되었던 과거의 모든 데이터들을 처음부터 처리하게 될 것이다. 따라서 초기에 컨슈머 연동을 하는 경우에는 latest로 설정해주면 어떠한 문제도 없으며, 기본값 역시 latest에 해당한다.
auto.offset.reset=latest
하지만 발행되는 메시지의 수를 현재의 컨슈머 수로 감당하지 못하는 경우에 파티션의 수를 늘려줘야 할 수 있다. 파티션 수를 변경(추가)하는 경우 해당 설정을 earlist로 하지 않으면, 메시지 손실이 발생할 수 있다.
왜냐하면 프로듀서가 새로 추가된 파티션으로 메시지를 보내기 시작하지만, 해당 시점에 컨슈머는 아직 리밸런싱이 진행중일 수 있다. 따라서 이미 신규 파티션에 메시지가 쌓인 상태에서 신규 컨슈머가 latest로 붙게 되면 리밸런싱이 되는 순간으로 인해 일부 메시지 누락이 생길 수 있다.
예를 들어 다음과 같이 2개의 파티션이 존재하는 상황이라고 하자.
컨슈머 그룹 | 토픽 | 파티션 | 오프셋 위치 |
group-A | my-topic | partition-0 | ex: 105 |
group-A | my-topic | partition-1 | ex: 87 |
새롭게 파티션을 추가했다면, 프로듀서는 해당 파티션에 이미 데이터를 쌓기 시작했을 것이다.
컨슈머 그룹 | 토픽 | 파티션 | 오프셋 위치 |
group-A | my-topic | partition-0 | ex: 105 |
group-A | my-topic | partition-1 | ex: 87 |
group-A | my-topic | partition-1 | ex: 3 |
컨슈머는 아직 리밸런싱을 하는 중이라서 프로듀서에 비해 파티션에 접근하는 시점이 느릴 수 밖에 없다. 그런데 리밸런싱이 끝난 후 latest 옵션으로 설정된 경우, 신규 파티션에 대해 읽지 못하는 메시지가 존재할 것이다.
- 파티션의 개수가 변경될 경우 컨슈머는 metadata.max.age.ms 만큼의 메타데이터 리프레시 기간 이후 리밸런싱이 일어나면서 파티션 할당과정을 거치게 됨
- 문제는 메타데이터 리프레시가 되기 이전에 새로운 파티션에 데이터가 여러 건 들어올 수 있다는 점임
여러 데이터가 쌓이고 이후에 auto.offset.reset이 latest인 컨슈머가 붙게 되면, 가장 최신의 offset부터 데이터를 읽어들여 이전 데이터의 소비가 누락될 수 있다. 따라서 파티션을 증설하는 경우에는 해당 설정값을 반드시 earlist로 잡아줄 필요가 있다.
auto.offset.reset=earlist
소스코드로 살펴보기
해당 부분 역시 소스코드 레벨까지 살펴볼 필요는 없을 것 같다.
[ partition.assignment.strategy ]
설정에 대한 설명
해당 내용은 하나의 컨슈머 그룹이 파티션을 나누어가질 때, 여러 컨슈머 인스턴스 간에 파티션 소유권을 어떻게 분배할지를 결정하는 설정이다. 다음과 같은 설정들이 존재한다.
- org.apache.kafka.clients.consumer.RangeAssignor
- 토픽별로 파티션을 순차적으로 나누어 할당
- 예: 6개 파티션, 2명의 컨슈머 → [0,1,2], [3,4,5]
- org.apache.kafka.clients.consumer.RoundRobinAssignor
- 파티션을 라운드로 분산하여 할당
- 예: 파티션 0 → 컨슈머 A, 파티션 1 → 컨슈머 B, 파티션 2 → A, ...
- org.apache.kafka.clients.consumer.StickyAssignor
- 최대한 균형 있는 할당을 유지하면서, 이전 파티션 할당을 최대한 유지하는 방식
- org.apache.kafka.clients.consumer.CooperativeStickyAssignor
- StickyAssignor와 동일한 로직 기반
- 단, Cooperative Rebalancing을 지원하여 리밸런스 중 기존 파티션을 즉시 해제하지 않음(다운타임 최소화)
해당 부분의 기본값은 RangeAssignor, CooperativeStickyAssignor인데 리밸런싱을 안전하게 처리하기 위해 이렇게 할당되어 있다고 한다.
소스코드로 살펴보기
해당 부분 역시 소스코드 레벨까지 살펴볼 필요는 없을 것 같다.
참고자료
- https://kafka.apache.org/documentation/#brokerconfigs
- https://kafka.apache.org/documentation/#consumerconfigs
- https://www.lydtechconsulting.com/blog-kafka-rebalance-part1.html
- https://d2.naver.com/helloworld/0974525
- https://blog.voidmainvoid.net/514
'Server' 카테고리의 다른 글
[Server] COW(CopyOnWrite) 기법과 이를 활용하는 자바와 레디스의 예시들(COW on Java and Redis) (2) | 2025.06.10 |
---|---|
[Server] Logback AsyncAppender의 동작 방식과 neverBlock 설정의 필요성 (0) | 2025.05.27 |
[Server] K6 부하 테스트 시나리오 작성하고 결과 지표 분석하기(K6 Load Testing) (2) | 2025.04.01 |
[LLM] MCP(Model Context Protocol)에 대하여 알아보고 IntelliJ와 Claude를 MCP로 연동하기 (11) | 2025.03.25 |
[Redis] Lettuce를 사용하는 경우에 MGET 동작 방식과 해시태그(HashTag) (4) | 2025.03.18 |