Kafka: a Distributed Messaging System for Log Processing

Kafka: A distributed messaging system for log processing In Proceedings of 6th International Workshop on Networking Meets Databases (NetDB), Athens, Greece (2011) by J. Kreps, N. Narkhede, J. Rao

Kafka는 기존의 메시징 시스템에서 당연하다고 가정하고 있었지만, 로그 처리 시스템에서는 필요없는 보장들을 과감하게 버리고, 성능 위주의 설계를 함으로써, 실제로 링크 속도에 육박하는 성능을 보여주고 있고, 상당히 단순한 아키텍쳐를 유지하고 있는 점이 흥미로운 점입니다. 그리고, 현실의 데이터 처리에 있어서는 버그나 장애로 인해 흔히 발생하는 재처리가 pull 기반의 소비 모델을 도입함으로써 매우 쉽게 가능해졌다는 점도 눈여겨 볼 부분입니다.

주의: 이 글은 2011에 출판된 Kafka의 페이퍼 내용을 다루고 있으며, 현재의 Kafka 버전의 내용을 다루고 있지 않습니다.

Problem

기존의 엔터프라이즈 메시징 시스템들의 한계를 다음과 같은 이유들로 설명하고 있습니다.

  • 배달 보장 (delivery guarantee)을 위한 기능들은 로그 처리를 위한 시스템 입장에서는 불필요..
  • 처리속도 (throughput)를 디자인 제약으로 고려하지 않음.
  • 분산에 대한 고려가 부족.
  • 메시지가 즉시 소비되는 것을 전제로 하기 때문에 메시지가 쌓일 경우 성능이 하락.

마찬가지로 최근에 만들어진 로그 처리 시스템들 – Scribe, Yahoo’s data highway project, Flume의 한계는 다음과 같은 이유들로 설명하고 있습니다.

  • 로그 데이터를 오프라인으로 처리하는 것을 위해 만들어진 시스템.
  • 대부분은 push 모델.

Kafka Architecture and Design Principles

Kafka의 주요 개념들과 개략적인 디자인은 다음과 같습니다.

kafka_architecture

  • 토픽 (topic): 특정한 타입의 메시지의 스트림
  • 프로듀서 (producer): 어떤 토픽에 대해 메시지를 발행 (publish)할 수 있음.
  • 프로듀서에 의해 발행된 메시지들은 브로커 (broker)라는 서버들에 저장됨. Kafka 클러스터는 일반적으로 여러 브로커들로 이루어짐.
  • 컨수머 (consumer)는 브로커로부터 1개 이상의 토픽을 구독할 수 있고, 브로커들로부터 데이터를 당김(pull)으로써 구독한 메시지들을 소비 (consume)할 수 있음.
    • point-to-point 배달 모델: 여러 컨수머가 하나의 토픽 내의 메시지를 하나씩 소비.
    • 발행/구독 (publish/subscribe) 모델: 여러 컨수머가 하나의 토픽의 각자의 복제본 메시지들을 소비.
  • 토픽은 여러 파티션들로 이루어져 있고, 각각의 브로커는 하나 이상의 파티션을 저장.

Efficiency on a Single Partition

Simple storage

  • 어떤 토픽의 각 파티션은 논리적인 로그에 해당.
  • 물리적으로 로그는 거의 동일한 크기의 세그먼트 파일들의 집합으로 구현됨.
  • 프로듀서가 어떤 파티션에 메시지를 발행할 때마다 브로커는 마지막 세그먼트 파일에 메시지를 append 한다.
  • 성능을 위해서 특정 개수의 메시지가 발행되거나 특정 시간이 흐른 후에 세그먼트 파일을 디스크로 flush한다.
    • 메시지는 flush가 된 후에 컨수머에게 노출됨.
  • 전형적인 메시징 시스템과 달리, 카프카의 메시지에는 메시지 식별자가 없고 로그 상의 논리적인 오프셋 (offset)을 주소로 사용.
    • 메시지 식별자로부터 메시지를 찾기 위한 인덱스 구조를 유지할 필요가 없어짐.
    • Kafka의 메시지 식별자 – 로그 상의 논리적인 오프셋은 자연히 증가하지만, 연속적이지는 않음.
  • 컨수머는 항상 특정한 파티션으로부터 연속적으로 메시지를 소비함.
  • 컨수머가 특정한 메시지 오프셋에 대해 ack을 한다면, 이는 그 파티션의 해당 오프셋 이전의 모든 메시지를 받았음을 의미.
  • 컨수머는 브로커에게 비동기적인 당김 (pull) 요청을 보낸다. 각 요청은 소비할 메시지의 오프셋과 최대로 가져올 메시지의 크기를 포함.
    • 브로커는 모든 세그먼트 파일의 첫번째 메시지의 오프셋들의 목록을 메모리 상에 유지.
    • 브로커는 그 목록을 세그먼트 파일을 찾고, 컨수머에게 데이터를 보낸다.
    • 컨수머가 메시지를 받은 후에는 다음 메시지의 오프셋을 계산해서 다음 요청에서 사용.

kafka_log

Efficient transfer

  • 프로듀서는 한번의 send 요청에 여러 메시지를 보낼 수 있음.
  • 컨수머 API는 한번에 하나의 메시지를 소비하는 것처럼 보이지만, 내부적으로는 각 당김 요청은 특정 사이즈 (보통 수백 KB) 내에 해당하는 여러 메시지를 가지고 옵니다.
  • 메시지들을 직접 메모리 상에 캐싱을 하지 않고 파일 시스템 페이지 캐시에 의존함.
    • 중복된 버퍼링을 피함.
    • 프로세스 내에서 메시지를 캐시하지 않기 때문에 가비지 컬렉션의 오버헤드가 매우 적어짐.
    • 프로듀서와 컨수머는 세그먼트 파일을 순차적으로 액세스하고, 보통 컨수머는 프로듀서보다 살짝 뒤쳐지기 때문에, 보통의 OS 캐싱 휴리스틱 (write through, read-ahead)이 매우 효과적으로 동작함.
  • sendfile API를 이용해서 복사와 시스템 콜 회수를 줄임.

Stateless broker

  • 대부분의 다른 메시징 시스템과 다르게, 각각의 컨수머가 얼마나 소비했는지는 브로커가 유지하지 않음.
    • 이러한 디자인은 브로커의 복잡도와 오버헤드를 줄임.
  • 문제는 이미 소비한 메시지를 언제 지울지 결정하는 것이 어려워짐.
    • Kafka는 시간 기준의 SLA를 사용해서 해결하는데, 특정 기간 (보통 1주일) 이상이 지나면 메시지는 자동적으로 삭제됨.
      • 대부분의 컨수머는 비교적 짧은 시간 단위 – 매일, 시간별, 실시간 – 내에 소비를 마치기 때문에 문제가 없음.
      • 카프카는 데이터가 크기에 따라서 성능이 저하되지 않으므로 이러한 방법이 가능.
  • 부가적인 이점으로, 컨수머는 이전의 오프셋으로 되돌아가 메시지를 다시 소비하는 것이 가능해짐.
    • 예를 들어, 컨수머의 로직에 에러가 있을 경우, 에러가 수정된 후에 문제가 된 메시지들을 다시 처리하는 것이 가능.
    • 컨수머가 크래시한다면 flush되지 않은 데이터는 잃어버리겠지만, flush하지 않은 가장 작은 오프셋부터 메시지를 다시 소비하는 것이 가능.
    • 이러한 메커니즘은 push 모델보다 pull 모델에서 훨씬 쉬움.

3.2 Distributed Coordination

  • 각각의 프로듀서는 랜덤하게 선택된 파티션이나 파티셔닝 키와 파티셔닝 함수에 의해 결정된 파티션으로 메시지를 발행할 수 있다.
  • 컨수머 그룹 (consumer group)이라는 개념
    • 토픽의 집합을 consume하는 하나 이상의 컨수머들로 구성.
    • 서로 다른 컨수머 그룹은 독립적으로 구독한 메시지들의 전체 집합을 consume하고, 컨수머 그룹 끼리는 조정?(coordination)이 불필요.
  • 어떤 토픽 내에서 하나의 파티션은 병행성의 최소 단위
    • 주어진 시점에 하나의 파티션으로부터의 모든 메시지는 각 컨수머 그룹 내에서 하나의 컨수머에 의해서만 소비됨.
    • 하나의 파티션을 여러 컨수머가 동시에 소비하도록 했다면 누가 어떤 메시지를 소비할지는 조정해야하는 오버헤드가 발생.
    • 로드를 균형 있게 맞추기 위해서는 파티션의 수는 컨수머의 수보다 더 많아지도록 유지.
  • 시스템을 단순화하기 위해 중앙의 마스터 노드를 가지지 않도록 함.
    • 조정을 위해서 Zookeeper를 사용.
    • Kafka가 ZooKeeper를 사용하는 작업들
      • 브로커와 컨수머의 추가와 제거를 탐지
      • 브로커나 컨수머가 추가/제거될 경우 각 컨수머에게 리밸런스 프로세스를 트리거.
      • 소비 관계를 유지하고, 각 파티션의 소비된 오프셋을 추적
        • 브로커나 컨수머가 시작하면 주키퍼의 브로커 또는 컨수머 레지스트리에 그 정보를 저장.
        • 브로커 레지스트리는 브로커의 호스트 이름과 포트, 토픽과 파티션의 집합을 포함.
        • 컨수머 레지스트리는 컨수머가 속한 컨수머 그룹과, 구독하고 있는 토픽들의 집합을 포함.
        • 각 컨수머 그룹은 소유권 레지스트리와 오프셋 레지스트리와 연관됨.
        • 소유권 레지스트리는 모든 구독된 파티션에 대해 하나의 ZooKeeper 경로를 가지며, 그 값은 현재 그 파티션을 소비하고 있는 컨수머의 ID.
        • 오프셋 레지스트리는 각각의 구독된 파티션에 대해 파티션 내에서 마지막으로 소비된 오프셋을 저장.
      • 브로커 레지스트리, 컨수머 레지스트리, 소유권 레지스트리에 대한 주키퍼 경로들은 ephemeral 경로.
      • 오프셋 레지스트리에 대해서는 persistent 경로.
      • 브로커가 실패하면 그 위의 모든 파티션은 자동적으로 브로커 레지스트리에서 삭제됨.
      • 컨수머의 실패는 컨수머 레지스트리에서 컨수머에 해당하는 항목과 소유권 레지스트리에서 그것이 가진 모든 파티션을 삭제.
    • 각 컨수머는 브로커와 컨수머 레지스트리에 대해 ZooKeeper Watcher를 등록하고 브로커나 컨수머 그룹에서 변화가 일어나면 통지를 받음.
    • 컨수머의 시작이나 컨수머가 브로커/컨수머 변화에 대해 통지를 받으면, 컨수머는 소비할 파티션의 부분집합을 정하기 위해 리밸런스 프로세스를 시작.
      • 브로커와 컨수머 레지스트리를 읽어서 각 구독된 토픽에 대해 사용가용한 파티션의 집합과 그 토픽을 구독하고 있는 컨수머의 집합을 계산.
      • 파티션 집합을 범위로 파티셔닝(range-partition)해서 컨수머의 수 만큼의 청크로 나누고 소유할 청크를 고름.
      • 컨수머가 고른 각 파티션에 대해 소유권 레지스트리에 그 파티션의 새로운 소유자로 스스로를 써넣음.
      • 컨수머는 각 소유한 파티션으로부터 데이터를 당겨오는 쓰레드를 시작. 오프셋 레지스트리에 저장된 오프셋으로부터 읽기 시작.
      • 컨수머는 주기적으로 오프셋 레지스트리에 마지막으로 소비한 오프셋을 업데이트.
    • 그룹 내에 여러 컨수머가 있을 때 각각은 브로커/컨수머 변경에 대해 통지를 받지만, 그 통지가 컨수머마다 살짝 다르게 올 수 있음.
      • 하나의 컨수머가 다른 컨수머에 의해 아직 소유된 파티션에 대한 소유권을 가지려고 하는 것이 가능.
      • 이 경우, 첫번째 컨수머는 단순히 모든 파티션을 릴리즈하고 조금 기다린 후 리밸런스 프로세스를 재시도한다.
      • 리밸런스 프로세스는 단 몇번의 재시도 만에 안정화됨.
    • 새로운 컨수머 그룹이 만들어졌을 때는 오프셋 레지스트리에는 오프셋이 없지만, API를 사용해서 최소 또는 최대의 오프셋부터 시작할 수 있음.

Delivery Guarantees

  • Kafka는 at-least-once 배달만을 보장함.
    • exactly-once 배달은 보통 2PC를 필요로 하고, 카프카가 대상으로 하는 애플리케이션은 필요로 하지 않음.
    • 보통의 경우, 메시지는 각 컨수머 그룹에 정확히 한번만 배달됨. 하지만, 컨수머 프로세스가 정상적으로 종료되지 않고 죽은 경우, 그 컨수머가 소유하고 있던 파티션을 새롭게 소유하게된 컨수머는 ZooKeeper에 커밋된 마지막 오프셋 후의 메시지들에 대해 중복 메시지를 소비하게 됨.
    • 애플리케이션에 있어서 메시지의 중복이 중요한 문제라면, Kafka가 컨수머에게 돌려주는 오프셋을 이용하거나 메시지 내의 어떤 유일하게 식별 가능한 키를 사용해서 자체적인 중복 방지 로직을 추가해야함.
  • Kafka는 하나의 파티션으로부터의 메시지는 컨수머로 차레대로 전달되는 것을 보장. 하지만 서로 다른 파티션으로부터 오는 메시지의 순서는 보장하지 않음.
  • 로그가 망가지는 것을 피하기 위해 각 메시지에 대해 CRC를 저장.
    • Kafka는 CC를 가진 메시지를 삭제하는 리커버리 프로세스를 실행.
    • 이 CRC는 네트워크 에러를 체크할 수도 있게 해줌.
  • 브로커가 죽으면 그것 내에 저장된 소비되지 않은 메시지는 사용불가능해짐.
    • 미래에는 여러 브로커에 메시지를 중첩해서 저장하는 자체적인 복제를  추가할 예정.

Experimental Results

Producer test

producer_performance

Kafka의 Producer 성능이 좋은 이유를 아래와 같은 이유로 설명하고 있습니다.

  • Kafka 프로듀서는 브로커로부터의 ack을 기다리지 않고 브로커가 다룰 수 있는 만큼 빠르게 메시지를 보냄.
    • 배치 크기가  50일 때는 Kafka 프로듀서는 프로듀서-브로커 사이의 1GB 링크에 포화됨. (saturated)
    • ack이 없으므로 발행된 모든 메시지가 실제로 브로커에 의해 받아진 건지 보장할 수 없음.
    • 로그 데이터에 대해서는 영속성을 처리속도와 트레이드를 하는 것이 좋음.
    • 미래에 영속성이 중요한 데이터에 대한 개선을 계획 중임.
  • 효율적인 스토리지 형식
    • 평균적으로 Kafka에서는 각 메시지는 9바이트의 오버헤드를 가짐. 반면, ActiveMQ에서는 144 바이트.
    • ActiveMQ에서의 오버헤드는 JMS가 요구하는 무거운 메시지 헤더에서 옴.
    • 여러 인덱싱 구조를 유지하기 위한 비용도 존재.
  • 배치 방식은 RPC의 오버헤드를 감추어서 (amortize) 처리속도를 크게 개선함.

 

Consumer Test

consumer_performance

Kafka가 좋은 성능을 보여주는 이유를 다음과 같이 설명하고 있습니다.

  • 효율적인 스토리지 포맷
  • ActiveMQ, RabbitMQ는 모든 메시지의 배달 상태를 유지하기 위해 꾸준히 쓰기가 발생하지만, Kafka의 브로커는 소비를 위한 디스크 쓰기가 발생하지 않음.
  • sendfile API의 사용.

Future Works

Kafka가 미래에 지원할 기능으로 우선 영속성을 위한 자체적인 복제 지원을 들고 있습니다. 비동기-동기 복제를 둘다 지원하려고 하고, 애플리케이션의 여러가지 요구에 따라 적절한 중첩 레벨을 선택할 수 있도록 하려 한다고 합니다. 다른 하나는 스트림 프로세싱입니다. 기본적인 레코드 수를 센다든지, 다른 스트림이나 다른 스토리지의 데이터와 join하는 등의 처리가 가능하게 하려고 합니다. 우리는 현재의 Kafka는 스트림 프로세싱을 지원하지 않고, Kafka와 협동하는 다른 프레임워크에 위임하는 방식으로 이루어지고 있다는 것을 알고 있습니다.