Kafka/Kafka

카프카 프로듀서

seungh0 2023. 4. 4. 02:28
반응형

파티셔너

  • 카프카의 토픽은 성능 향상을 위한 병렬 처리가 가능하도록 하기 위해 파티션으로 나뉩니다. (최소 하나 또는 둘 이상의 파티션)
  • 그리고 프로듀서가 파티션으로 전송한 메시지는 해당 토픽 내 각 파티션의 로그 세그먼트에 저장된다.
  • 따라서 프로듀서는 토픽으로 메시지를 보낼 때 해당 토픽의 어느 파티션으로 메시지를 보내야 할지를 결정해야 하는데 이때 사용하는 것이 파티셔너.
  • 프로듀서가 파티션을 결정하는 알고리즘은 기본적으로 메시지(레코드)의 키를 해시처리해 파티션을 구하는 방식을 사용해서 메시지의 키 값이 동일하면 해당 메시지들은 모두 같은 파티션으로 전송된다.
    • 주의할 점은 프로듀서가 동일한 메시지의 키를 이용해 메시지를 전송하더라도 파티션의 수를 늘린 후에는 다른 파티션으로 전송될 수 있다.
    • 따라서 메시지의 키를 이용해 카프카로 메시지를 전송하는 경우, 되도록 파티션의 수를 변경하지 않는 것을 권장.

라운드 로빈 전략

  • 프로듀서의 메시지 중 레코드의 키 값은 필수값이 아니므로, 키 값을 지정하지 않고 메시지를 전송할 수 있다. (null) → 기본값인 라운드 로빈 알고리즘을 사용해 프로듀서는 목적지 토픽의 파티션들로 레코드들을 랜덤 전송.
  • 파티셔너를 거친 레코드들은 배치 처리를 위해 프로듀서의 버퍼 메모리 영역에서 잠시 대기한 후 카프카로 전송된다. → 배치 처리를 위해 잠시 메시지들이 대기하는 과정에서 라운드 로빈 전략은 효율을 떨어트릴 수 있다.

스티키 파티셔닝 전략

  • 라운드 로빈 전략에서 지연시간이 불필요하게 증가되는 비효율적인 전송을 개선하기 위해 아파치 카프카 2.4 버전부터는 스티키 파티셔닝 전략을 사용하게 된다.
  • 라운드 로빈 전략에서 배치 전송을 위한 필요 레코드 수를 채우지 못해 카프카로 배치 전송을 하지 못했던 것과 달리, 스티키 파티셔닝이란 하나의 파티션에 레코드 수를 먼저 채워서 카프카로 빠르게 배치 전송하는 전략.
  • 카프카로 전송하는 메시지의 순서가 그다지 중요하지 않은 경우라면 스티키 파티셔닝 전략을 적용하기를 권장.


프로듀서의 배치

  • 카프카에서는 토픽의 처리량을 높이기 위한 방법으로 토픽을 파티션으로 나눠 처리하며, 프로듀서에서 처리량을 높이기 위해 배치 전송을 권장.
  • 프로듀서에서는 카프카로 전송하기 전, 배치 전송을 위해 토픽의 파티션별로 레코드들을 잠시 보관하고 있는다.

  • buffer.memory: 카프카로 메시지들을 전송하기 위해 담아두는 프로듀서의 버퍼 메모리 옵션. (기본값 32MB)
  • batch.size: 배치 전송을 위해 메시지들을 묶는 단위 (기본값 16KB)
  • linger.ms: 배치 전송을 위해 버퍼 메모리에서 대기하는 메시지들의 최대 대기시간 (기본값 0, 배치 전송을 위해 기다리지 않고 메시지들이 즉시 전송)
  • 배치 전송은 불필요한 I/O를 줄일 수 있어, 매우 효율적이며, 카프카의 요청 수를 줄여주는 효과도 존재.
    • 목적에 따라 처리량을 높일지, 지연 없는 전송을 해야 할지 선택
      • 처리량을 높여야 하는 경우 → 효율적인 배치 전송을 위해 프로듀서의 설정을 변경 (batch.size, linger.ms의 값을 크게 설정)
        • 주의할 점은 버퍼 메모리의 크기가 충분히 커야 함. (buffer.memory는 batch.size보다 커야 한다
        • buffer.memory: batch.size * 파티션 개수 * N (전송 실패 시 재시도 고려)
      • 지연 없는 전송이 목표 → 프로듀서의 배치 전송 관련 설정을 제거 (batch.size, linger.ms의 값을 작게 설정)
  • 배치 전송과 더불어 압축 기능을 같이 사용한다면, 프로듀서는 메시지들을 더욱 효율적으로 카프카로 전송할 수 있다.
    • 높은 압축률 → gzip, zstd
    • 낮은 지연시간 → lz4, snappy


메시지 전송 방식

  • 메시지 시스템들의 메시지 전송 방식에는 “적어도 한 번 전송”, “최대 한 번 전송”, “정확히 한 번 전송”이 존재.

적어도 한 번 전송

  • 프로듀서가 브로커의 특정 토픽으로 메시지 A를 전송.
  • 브로커는 메시지 A를 기록하고, 잘 받았다는 ACK를 프로듀서에게 전송하려 하지만, 네트워크 오류 또는 브로커 장애가 발생하여 프로듀선느 메시지 A에 대한 ACK를 받지 못함.
  • 메시지 A를 전송한 후 브로커로부터 ACk를 받지 못한 프로듀서는 브로커가 메시지 A를 받지 못했다고 판단해 메시지 A를 재전송한다.

네트워크의 회산 장애나 기타 장애 상황에 따라 일부 메시지 중복이 발생할 수는 있지만, 최소 하나의 메시지는 반드시 보장한다는 것 → 적어도 한 번 전송 방식 (기본 설정)


최대 한 번 전송

  • 프로듀서가 메시지 A를 브로커에게 전송.
  • 브로커는 메시지 A를 기록하지 못하고, 잘 받았다는 ACK를 프로듀서에게 전송하지 못한다.
  • 프로듀서는 브로커가 메시지 A를 잘 받았다고 가정하고, 다음 메시지를 전송한다.

최대 한 번 전송은 ACK를 받지 못하더라도 재전송하지 않는다 → 일부 메시지가 손실되더라도 높은 처리량을 필요로 하는 대량의 로그 수집이나 IoT 같은 환경에서 사용

  • 메시지 손실 가능성은 있지만 메시지 중복 가능성은 없음.


중복 없이 전송

  • 프로듀서가 브로커의 특정 토픽으로 메시지 A를 전송한다. 이때 PID 0과 메시지 번호 0을 헤더에 포함해 함께 전송.
  • 브로커는 메시지 A를 저장하고, PID와 메시지 번호 0을 메모리에 기록. 그리고 메시지를 잘 받았다는 ACK를 프로듀서에게 응답한다.
  • 만약 문제가 생겨 재전송시, 프로듀서가 재전송한 메시지의 헤더에서 PID와 메시지 번호를 비교해서 메시지가 이미 브로커에 저장되어 있는 것을 확인한 브로커는 메시지를 중복 저장하지 않고 ACK만 보냅니다.

여기서 사용되는 메시지 번호는 시퀀스 번호라고 하며 0번부터 시작해서 순차적으로 증가한다, PID의 경우 사용자가 별도로 생성하는 것이 아니며 프로듀서에 의해 자동 생성된다.

  • 중복 없는 전송 방식은 중복을 피하기 위해 메시지 비교 동작에는 오버헤드가 존재한다. (최대 약 20% 정도의 성능 감소)
  • 프로듀서 전송 성능에 그다지 민감하지 않은 상황에서 중복 없는 메시지 전송이 필요하다면 이 방식을 설정해 적용할 것을 권장한다.
enable.idempotence=true 
# 프로듀서가 중복 없는 전송을 허용할지 결정하는 옵션 (기본값 false)

max.in.flight.requests.per.connection=1~5 
# ACK를 받지 않은 상황에서 하나의 커넥션에서 보낼 수 있는 최대 요청 수 (기본 값 5, 5이하로 설정 필요)

acks=all
# 기본 값은 1이며, all로 설정

retries=5
# ACK를 받지 못한 경우 재시도를 해야 하므로 0보다 큰 값으로 설정


정확히 한 번 전송

  • 중복 없는 전송 방식이 정확히 한 번 전송한다는 의미는 아님. 카프카에서 정확히 한 번 전송은 트랜잭션과 같은 전체 프로세스 처리를 의미하며, 중복 없는 전송은 정확히 한 번 전송의 일부 기능이라고 할 수 있음.
  • 전체적인 프로세스를 관리하기 이해 카프카에서는 정확히 한 번 처리를 담당하는 별도의 프로세스가 존재하는데, 트랜잭션 API라고 한다.

디자인

  • 프로듀서가 카프카로 정확히 한 번 전송 방식으로 메시지를 전송할 때, 프로듀서가 보내는 메시지들은 원자적으로 처리되어 전송에 성공하거나 실패하게 된다.
    • 이런 프로듀서의 전송을 위해 카프카에는 컨슈머 그룹 코디네이터와 동일한 개념으로 트랜잭션 코디네이터라는 것이 서버 측에 존재하며, 프로듀서에 의해 전송된 메시지들을 관리하며, 커밋 또는 중단 등을 표시한다.
    • 트랜잭션 로그를 카프카의 내부 토픽인 _transaction_state에 저장한다.
    • 기본 값은 transaction.state.log.num.partitions=50, transaction.state.log.replicatioin.factor=3.
  • 정확히 한 번 전송을 이용해 전송된 메시지들이 카프카에 저장되면, 카프카의 메시지를 다루는 클라이언트들은 해당 메시지들이 정상적으로 커밋된 것인지 또는 실패한 것인지 식별할 수 있어야 한다. → 컨트롤러 메시지라고 불리는 특별한 타입의 메시지가 추가로 사용된다.
    • 컨트롤 메시지는 페이로드에 애플리케이션 데이터를 포함하지 않으며, 애플리케이션들에게 노출되지 않는다. (오직 브로커와 클라이언트 통신에서만 사용된다)
transation.id.config=abc-transaction-1
# 중복 없는 전송과 정확히 한 번 전송의 옵션 설정에서 가장 큰 차이점이자 주의해야할 설정으로, 
# 실행하는 프로듀서 프로세스마다 고유한 아이디로 설정해야 한다.

enable.idempotence=true 
max.in.flight.requests.per.connection=1~5
acks=all
retries=5


단계별 동작

  • 가장 먼저 수행하는 작업은 트랜잭션 코디네이터 찾기
    • 프로듀서는 브로커에게 FindCoorindator Request를 보내서 트랜잭션 코디네이터의 위치를 찾는다.
    • 컨슈머 코디네이터와 유사한 역할을 하는 트랜잭션 코디네이터는 브로커에 위치한다.
    • 주 역할은 PID와 transactio.id를 매핑하고 해당 트랜잭션 전체를 관리하는 것
    • _transaction_state 토픽의 파티션 번호는 transaction.id를 기반으로 해시하여 결정되고, 이 파티션의 리더가 있는 브로커가 트랜잭션 코디네이터의 브로커로 최종 선정된다
  • 프로듀서 초기화 동작
    • 프로듀서는 initTransactions() 메소드를 이용해 트랜잭션 전송을 위한 InitPidRequest를 트랜잭션 코디네이터로 보낸다. 이때 TID가 설정된 경우에는 InitPidRequest와 함께 TID가 트랜잭션 코디네이터에게 전송된다.
      • 트랜잭션 코디네이터는 TID, PID를 매핑하고 해당 정보를 트랜잭션 로그에 기록한다. 그리고 PID 에포크를 한 단계 올리는 동작을 하게 되고, PID 에포크가 올라감에 따라 이전의 동일한 PID와 이전 에포크에 대한 쓰기 요청은 무시된다.
  • 트랜잭션 시작 동작
    • 프로듀서는 beginTransaction() 메소드를 이용해 새로운 트랜잭션의 시작을 알리게 된다.
  • 트랜잭션 상태 추가 동작
    • ID와 파티션의 정보가 트랜잭션 로그에 기록되며, 트랜잭션의 현재 상태를 Ongoing으로 표시한다.
    • 만약 트랜잭션 로그에 추가되는 첫 번째 파티션이며, 트랜잭션 코디네이터는 해당 트랜잭션에 대한 타이머를 시작한다 (기본 값으로 1분 동안 트랜잭션 상태에 대한 업데이트가 없다면, 해당 트랜잭션은 실패로 처리된다)
  • 메시지 전송 동작
    • 프로듀서는 대상 토픽의 파티션으로 메시지를 전송한다.
      • 메시지에는 PID, 에포크, 시퀀스 번호가 함께 포함되어 전송된다.
  • 트랜잭션 종료 요청 동작
    • 메시지 전송을 완료한 프로듀서는 commit Tranaction() 또는 abortTrasanction() 메소드 중 하나를 반드시 호출해야 하며, 해당 메소드의 호출을 통해 트랜잭션이 완료됨을 트랜잭션 코디네이터에게 알린다.
    • 트랜잭션 코디네이터는 두 단계 커밋 과정을 시작하게 되며, 첫 번째 단계로 트랜잭션 로그에 해당 트랜잭션에 대한 PrepareCommit 또는 PrepareAbort를 기록한다.
  • 사용자 토픽에 표시하는 단계이며, 트랜잭션 로그에 기록된 토픽의 파티션에 트랜잭션 커밋 표시를 기록한다. (여기서 기록하는 메시지가 컨트롤 메시지)
  • 트랜잭션 완료
    • 트랜잭션 코디네이터는 완료됨이라고 트랜잭션 로그에 기록한다. 그리고 프로듀서에게 해당 트랜잭션이 완료됨을 알린 다음 해당 트랜잭션에 대한 처리는 모두 마무리된다.
    • 트랜잭션을 이용한 컨슈머는 read_commiited 설정을 하면 트랜잭션에 성공한 메시지들만 읽을 수 있게 된다.
반응형