Kafka/Kafka

카프카 컨슈머와 그룹 코디네이터

seungh0 2023. 4. 5. 01:24
반응형

컨슈머 오프셋 관리

  • 컨슈머의 동작 중 가장 핵심은 오프셋 관리.
    • 오프셋이란 메시지의 위치를 나타내는 위치
  • 컨슈머는 카프카에 저장된 메시지를 꺼내오는 역할을 하기 때문에 컨슈머가 메시지를 어디까지 가져왔는지를 표시하는 것은 매우 중요.
  • 컨슈머 그룹은 자신의 오프셋 정보를 카프카에서 가장 안전한 저장소인 토픽에 저장.
    • __consumer_offsets 토픽에 각 컨슈머 그룹별로 오프셋 위치 정보가 기록됨.
    • 모든 컨슈머 그룹의 정보가 저장되는 __consumer_offsets 토픽은 파티션 수와 리플리케이션 팩터 수를 갖고 있으며, 내부 토픽이지만 파티션 수와 리플리케이션 팩터수는 브로커의 설정 파일인 server.properties에서 변경 가능.
      • offsets.topic.num.partitions (기본값 50)
      • offsets.topic.replication.factor (기본값 3)
        • 파티션 수와 리플리케이션 팩터 수는 기본값만 사용해도 충분한대, 카프카 클러스터 초기에 잘못 설정해 리플리케이션 팩터 수가 1로 설정되어 있는 경우도 있으니 꼭 확인이 필요


그룹 코디네이터

  • 컨슈머들은 하나의 컨슈머 그룹의 구성원으로 속하며, 컨슈머 그룹 내의 각 컨슈머들은 서로 자신의 정보를 공유하며 하나의 공동체로 동작함
  • 컨슈머 그룹은 컨슈머 합류 등 변화를 인지하고 각 컨슈머들에게 작업을 균등하게 분배해야 하는데, 이러한 동작을 컨슈머 리밸런싱이라고 한다.
  • 카프카의 그룹 코디네이터가 이를 관리한다.
    • 컨슈머 그룹이 구독한 토픽의 파티션들과 그룹의 멤버들을 트래킹 하는 것.
    • 파티션 또는 그룹의 멤버에 변화가 생기면, 작업을 균등하게 재분배하기 위해 컨슈머 리밸런싱 동작이 발생.
    • 그룹 코디네이터는 각 컨슈머 그룹별로 존재하며, 카프카 클러스터 내의 브로커 중 하나에 위치.


컨슈머 그룹 등록 과정

  • 컨슈머는 컨슈머 설정 값 중에서 bootstrap.brokers 리스트에 있는 브로커에게 컨슈머 클라이언트와 초기 커넥션을 연결하기 위한 요청을 보냄
  • 해당 요청을 받은 브로커는 그룹 코디네이터를 생성하고 컨슈머에게 응답을 보냅니다.
  • 그룹 코디네이터는 group.initial.rebalance.delay.ms의 시간동안 컨슈머의 요청을 기다린다.
  • 컨슈머는 컨슈머 등록 요청을 그룹 코디네이터에게 보내며, 가장 먼저 요청을 보내는 컨슈머가 컨슈머 그룹의 리더가 된다.
  • 컨슈머 등록 요청을 받은 그룹 코디네이터는 해당 컨슈머 그룹이 구독하는 토픽 파티션 리스트 등을 리더 컨슈머의 요청에 응답을 보냄.
  • 리더 컨슈머는 정해진 컨슈머 파티션 할당 전략에 따라 그룹 내 컨슈머들에게 파티션을 할당한 뒤 그룹 코디네이터에게 전달한다.
  • 그룹 코디네이터는 해당 정보를 캐시하고 각 그룹 내 컨슈머들에게 성공을 알린다.
  • 각 컨슈머들은 각자 지정된 토픽 파티션으로부터 메시지들을 가져온다.

컨슈머들은 현재 자신들이 속한 컨슈머 그룹에서 빠져나갈 수도, 새롭게 합류할 수도 있음.

  • 이러한 컨슈머 그룹의 변화들은 컨슈머 코디네이터에게 컨슈머가 join 또는 leave 요청을 보냄으로써 자연스럽게 처리된다.
  • 하지만 컨슈머가 장애로 leave 요청을 보내지 못하고 종료되는 경우는 그룹 코디네이터는 어떻게 이를 감지할 수 있을까? → 컨슈머들의 변경을 감지하기 위해 그룹 코디네이터와 컨슈머들은 서로 하트비트를 주고 받는다.
    • 하트비트를 주기적으로 주고받으면서 그룹 코디네이터는 컨슈머가 살아 있는지, 잘 동작하는지를 확인.
heartbeat.interval.ms=3000 
# 그룹 코디네이터와 하트비트 인터벌 시간 (session.timeout.ms 보다 낮게 설정해야 하며 1/3 수준이 적절)

session.timeout.ms=10000
# 컨슈머가 특정 시간 안에 하트비트를 받지 못하면 문제가 발생했다고 판단해 컨슈머 그룹에서 해당 컨슈머는 제거되고 리밸런싱 동작이 일어난다.

max.poll.interval.ms=300000
# 컨슈머는 주기적으로 poll()을 호출해 토픽으로부터 레코드를 가져오는데, poll()호출 후 최대 5분간 poll()호출이 없다면 컨슈머가 문제가 있는 것으로 판단해 리밸런싱 동작이 일어난다.

  • 그룹 코디네이터는 하트비트 옵션을 토해 컨슈머의 상태를 확인하며, 특정 컨슈머에 문제가 발생했다고 판단되면 컨슈머 리밸런싱 동작을 통해 컨슈머 그룹의 전체 균형을 다시 맞춘다.
반응형