Paper: TAO: Facebook’s Distributed Data Store for the Social Graph (Part 1)

Bronson, Nathan, et al. “TAO: Facebook’s Distributed Data Store for the Social Graph.” USENIX Annual Technical Conference. 2013.

Facebook은 사용자들 사이의 관계, 사용자들의 포스팅, 이에 대한 코멘트 등을 MySQL에 저장하고 memcache에 캐싱하고 있었는데, 이를 개선한 TAO라는 시스템에 관한 페이퍼. geographically 분산된 단일한 인스턴스라는 점이 놀라운 점. 이후에도 설명되겠지만 graph abstraction만으로 Facebook의 주요한 데이터들을 표현한다는 것도 매우 흥미로운 점.

Before TAO, Facebook’s web servers directly accessed MySQL to read or write the social graph, aggressively using memcache [21] as a lookaside cache. TAO implements a graph abstraction directly, allowing it to avoid some of the fundamental shortcomings of a lookaside cache architecture. TAO continues to use MySQL for persistent storage, but mediates access to the database and uses its own graph-aware cache.
TAO is deployed at Facebook as a single geographically distributed instance. It has a minimal API and explicitly favors availability and per-machine efficiency over strong consistency; its novelty is its scale: TAO can sustain a billion reads per second on a changing data set of many petabytes.

Background

Facebook의 타임라인과 같은 데이터를 구축할 때 흔히 컨텐트가 생성될 때 타임라인 데이터를 업데이트하는 방식과 타임라인을 읽을 때 타임라인 데이터를 업데이트하는 두가지 방식이 고려되는데, 이 페이퍼에서는 Facebook에서는 모든 항목에 대해 privacy check가 동적으로 이루어져야 하기 때문에 전자의 접근을 불가능하다고 얘기한다. 즉, 실시간으로 aggregation과 filtering이 이루어져야 한다는 것.

We present each user with content tailored to them, and we filter every item with privacy checks that take into account the current viewer. This extreme customization makes it infeasible to perform most aggregation and filtering when content is created; instead we resolve data dependencies and check privacy each time the content is viewed.

MySQL과 memcache를 이용한 원래의 아키텍쳐 대신 TAO를 만들어야 했던 이유로, PHP API에서의 encapsulation 실패 – 자세히는 설명되어있지 않지만, graph abstraction이 아니어서 발생하는 데이터 모델상의 여러가지 문제들을 말하는 것이 아닐까 싶다. – 그리고, PHP가 아닌 언어로부터의 접근 – 역시 이러한 요구사항은 흔히 아키텍쳐의 변화를 이끄는 동력이 되는 듯 – 그리고 lookaside 캐시 아키텍쳐의 여러가지 문제들을 들고 있다. 캐시 아키텍쳐의 문제들로는 다음과 같은 문제들을 들고 있다.

우선 edge들의 리스트를 표현하는 데에 있어서 특정 키에 해당하는 모든 edge 리스트를 가져와야 하는 key-value 캐시는 비효율적임을 들고 있다. 캐시에서 리스트를 직접 지원한다면 이러한 문제를 해결할 수 있으나 동시적인 업데이트에 따른 문제들을 언급하고 있다.

Inefficient edge lists: A key-value cache is not a good semantic fit for lists of edges; queries must always fetch the entire edge list and changes to a single edge require the entire list to be reloaded. Basic list support in a lookaside cache would only address the first problem; something much more complicated is required to coordinate concurrent incremental updates to cached lists.

액세스의 제어를 위한 로직들이 클라이언트에 있으므로 thundering herds와 같은 문제들이 발생하는 것을 언급하고 있다. 이를 캐시에 내장함으로써 더욱 효율적으로 문제를 해결할 수 있다고 얘기하고 있다.

Distributed control logic: In a lookaside cache architecture the control logic is run on clients that don’t communicate with each other. This increases the number of failure modes, and makes it difficult to avoid thundering herds. Nishtala et al. provide an in-depth discussion of the problems and present leases, a general solution [21]. For objects and associations the fixed API allows us to move the control logic into the cache itself, where the problem can be solved more efficiently.

Facebook은 MySQL의 비동기 master/slave 리플리케이션을 사용하고 있기 때문에 슬레이브를 사용하는 데이터센터의 캐시들에는 consistency 문제가 있다. 이 페이퍼에서는 가능한 한 복제본의 캐시의 consistency를 유지하기 위한 방법들을 제시하고 있다.

Expensive read-after-write consistency: Facebook uses asynchronous master/slave replication for MySQL, which poses a problem for caches in data centers using a replica. Writes are forwarded to the master, but some time will elapse before they are reflected in the local replica. Nishtala et al.’s remote markers [21] track keys that are known to be stale, forwarding reads for those keys to the master region. By restricting the data model to objects and associations we can update the replica’s cache at write time, then use graph semantics to interpret cache maintenance messages from concurrent updates. This provides (in the absence of multiple failures) read-after-write consistency for all clients that share a cache, without requiring inter-regional communication.

TAO Data Model and API

TAO의 Object는 64-bit integer로 식별되며, object type (otype)을 가지고 있다. Association은 source object (id1), association 타입 (atype), destination object (id2)로 식별된다. 임의의 두 object들 사이에서 특정 타입의 association은 최대 1개만 존재할 수 있다는 제약이 존재한다. Object와 association은 모두 key-value attribute들을 가지고 있으며 가능한 key들과 value들의 type, 그리고 default value는 type별 schema에 따라 정해진다. per-type schema라는 이 개념은 대단히 새로운 것은 아니지만 일반적인 모델의 attribute 정의에 대해 고민하던 내게 도움이 되었다.

TAO objects are typed nodes, and TAO associations are typed directed edges between objects. Objects are identified by a 64-bit integer (id) that is unique across all objects, regardless of object type (otype). Associations are identified by the source object (id1), association type (atype) and destination object (id2). At most one association of a given type can exist between any two objects. Both objects and associations may contain data as key→value pairs. A per-type schema lists the possible keys, the value type, and a default value. Each association has a 32-bit time field, which plays a central
role in queries1.

Object: (id) → (otype, (key  value)∗)
Assoc.: (id1, atype, id2) → (time, (key  value)∗)

TAO의 Object API는 Object에 대한 기본적인 CRUD에 더해서 field들의 subset을 업데이트할 수 있는 API를 지원하고 있다.

Association API도 association의 CRUD를 위한 API를 제공하고 있다. 매우 흥미로운 점은 친구 관계와 같이 양방향을 가지는 association의 경우, 그 association type을 inverse type으로 설정을 해주면 TAO association API에서 알아서 inverse association에 대해서도 operation을 실행해준다는 점이다. Graph DB에서 당연한 기능인지는 잘 모르겠지만, 이를 application layer에서 직접 구현하고자 한다면 boilerplate 코드가 되기 쉬운 부분들이 일반적으로 해소되고 있다고 생각한다. 한편, inverse type의 association을 추가할 때 가장 걱정이 되는 점은 두 association의 write가 atomic하게 반영될 수 있는가 일텐데, 이에 대해서는 페이퍼에서 명시적으로 언급하고 있지는 않은 듯 하다.

  • assoc add(id1, atype, id2, time, (k→v)*) – Adds or overwrites the association (id1, atype,id2), and its inverse (id1, inv(atype), id2) if defined.
  • assoc delete(id1, atype, id2) – Deletes the association (id1, atype, id2) and the inverse if it exists.
  • assoc change type(id1, atype, id2, newtype) – Changes the association (id1, atype, id2) to (id1,
    newtype, id2), if (id1, atype, id2) exists.

Facebook과 같은 서비스에서는 대부분의 데이터는 오래된 것이고 자주 액세스되어야 할 데이터는 최근 생성된 일부의 데이터라는 점을 creation-time locality라는 말로 표현하고 있다. TAO의 Association Query API들은 creation-time locality에 따른 cache 가능성을 높이기 위해 time range 쿼리가 가능한 점을 엿볼 수 있다. 역시 흥미로운 점은 Association Query API로부터 리턴되는 Association List의 association type별 제약이 정해져있다는 점이다. 잘은 모르지만 이러한 점들은 일반적인 Graph DB에서는 가할 수 없는 제약이 아닐까 싶다.

  • assoc get(id1, atype, id2set, high?, low?) – returns all of the associations (id1, atype, id2) and their time and data, where id2 ∈ id2set and high ≥ time ≥ low (if specified). The optional time
    bounds are to improve cacheability for large association lists (see § 5).
  • assoc count(id1, atype) – returns the size of the association list for (id1, atype), which is the number of edges of type atype that originate at id1.
  • assoc range(id1, atype, pos, limit) – returns elements of the (id1, atype) association list with index i ∈ [pos,pos+limit).
  • assoc time range(id1, atype, high, low, limit) – returns elements from the (id1, atype) association list, starting with the first association where time ≤ high, returning only edges where time ≥ low.

Paper: A simple totally ordered broadcast protocol

Reed, Benjamin, and Flavio P. Junqueira. “A simple totally ordered broadcast protocol.” proceedings of the 2nd Workshop on Large-Scale Distributed Systems and Middleware. ACM, 2008.

이 페이퍼는 Zab라는 ZooKeeper의 내부에 구현되어있는 ordered broadcast 프로토콜을 소개하고 있다. 이 페이퍼는 informal한 형태로 Zab에 대한 요구사항과 동기, 그리고 프로토콜 자체에 대해서 설명하고 있으므로 이를 파악하기에 좋은 것 같다. Zookeeper에 대한 페이퍼Zab에 대한 좀 더 formal한 페이퍼도 따로 있으므로 앞으로 읽어볼 것.

The logical components of the ZooKeeper service

Zab는 Zookeeper Atomic Broadcast라는 이름대로 Zookeeper의 노드들 사이에 동일한 오퍼레이션들의 순서를 유지하고 결과적으로 노드들의 상태를 동일하게 유지되기 위해 사용된다. (Atomic Broadcast 참고.)

Zookeeper는 클라이언트로부터의 request들을 그대로 Zab로 보내는 것이 아니라 conditional operation이나 version increment를 포함하고 있어서 본질적으로 idempotent하지 않은 request를 idempotent한 transaction으로 변환하는 과정을 가지고 있다. 이 변환은 leader 노드에서만 발생할 수 있으며, 그 이유는 leader 노드만이 오퍼레이션으로 인해 변할 미래의 상태에 대한 완벽한 정보를 가지고, 새로운 값을 정할 수 있기 때문이다.

Requirements

Zookeeper는 다음과 같은 요구사항을 가지고 있다.

  • Reliable delivery
    • If a message, m, is delivered by one server, then it will be eventually delivered by all correct servers.
  • Total order
    • If message a is delivered before message b by one server, then every server that delivers a and b delivers a before b.
  • Causal order
    • If message a causally precedes message b and both messages delivered, then a must be ordered before b.
  • Prefix property
    • If m is the last message delivered for a leader L, any message proposed before m by L must also be delivered.

우리가 알고 있는 Zookeeper의 특성 상 다른 것들은 쉽게 추론할 수 있으나, 클라이언트 입장에서의 correctness를 표현한 Causal order requirement는 이러한 말로 표현하는 것을 아직 자주 본 것은 아니라서 눈여겨보게 되었다.

Zookeeper의 어떤 노드가 실패했을 때 snapshot을 읽어들인 다음 snapshot 이후에 배달된 transaction들을 리플레이하게 되는데, 위에서 설명했던 idempotent한 transaction 덕에 이러한 복구 과정에서 atomic broadcast는 at most once delivery를 보장하지 않아도 된다. 이는 또한 복구 과정에서는 total order requirement가 완화되는 것을 의미하기도 한다.

Motivation

이 부분에서는 왜 이미 존재하는 프로토콜을 사용하지 않고 Zab를 고안해서 사용하게 되었는가를 각 프로토콜을 들어 설명하고 있다. 굉장히 많은 페이퍼들을 인용하며 설명을 하고 있는데, 가장 중요한 것은 역시 Paxos와의 비교라고 할 수 있다. 이 페이퍼는 현실의 조건에 따른 가정을 토대로 Paxos를 단순화시킬 수 있다고 이야기 하고 있다.

Paxos는 메시지의 손실과 순서가 바뀌는 것 조차도 허용하고 있지만, TCP를 사용한다면 메시지가 FIFO 순서로 전달되는 것을 가정할 수 있고, Zab는 이에 따라 per-proposer causality를 보장할 수 있다. Paxos는 FIFO channel을 가정하지 않기 때문에 이러한 보장을 할 수 없다.

두번째로 Paxos에서는 복수의 leader로부터 하나의 인스턴스에 대한 proposal이 가능하므로, proposal이 서로 충돌하거나 어떤 값이 commit 되었는지 알아내기 힘든 문제가 있다. Paxos에서는 어떤 노드라도 높은 투표만 얻으면 리더가 될 수 있는 반면, Zab에서는 다수의 노드가 기존의 리더를 버리지 않는 이상 새로운 리더가 될 수 없다. 이로 인해 Zab에서는 주어진 인스턴스에 대해서 단 하나의 proposal을 보장할 수 있고, 이는 프로토콜을 여러 면에서 단순화 시킨다.

Protocol

Zab protocol은 recovery와 broadcast 두 개의 모드로 이루어져있다. Zookeeper를 처음으로 실행했을 때나 leader 실패가 발생했을 때는 recovery 모드가 되고, leader가 성립되고 quorum이 leader와 동기화를 완료하게 되면 broadcast 모드가 된다.

Zookeeper는 broadcast의 leader를 (request를 transaction으로 변환하기 위한) write request의 leader로도 사용하기 때문에, 만약 분리되었을 경우 발생할 둘 사이의 latency를 제거한다.

Broadcast

단순화된 two-phase commit과 유사하다고 얘기하고 있다. two-phase commit과는 달리 Zab에는 abort가 없기 때문에 follower는 leader의 proposal을 수용하거나 leader를 포기하는 두가지의 선택지 밖에는 없으며, 또한 leader는 quorum을 이룬 follower들로부터 ack을 받는다면 commit을 진행할 수 있다.

broadcast 프로토콜은 모든 커뮤니케이션에 대해 FIFO 채널을 이용하므로 순서를 보증하는 것이 매우 쉽다.

leader가 proposal을 보낼 때는 zxid라고 불리는 단조증가하는 ID를 부여한다. Zab는 causal ordering을 보존하기 때문에 전달된 메시지는 zxid들에 의해서도 정렬된다. 이 후에 proposal은 follower별로 존재하는 queue에 넣어지고, 이는 FIFO 채널을 통해 follower로 전달된다. follower는 받은 proposal을 디스크에 쓰고 leader에게 ack을 보낸다. leader가 다수의 follower로부터 ack을 받으면 commit을 broadcast하고 leader 자신에게 메시지를 배달(deliver)한다. follower들은 commit을 받았을 때 메시지를 배달한다.

Recovery

Recovery에서 중요한 것은 commit된 모든 메시지를 제대로 보존하는 것과 commit되지 않은 – skip된 메시지를 보존하지 않는 것이다.

commit된 모든 메시지를 보존하는 문제는 leader election 과정에서 quorum 서버 중에서 가장 높은 proposal number를 가지고 있는 노드를 leader로 선택하는 것이다. 새로 선출된 leader는 새로운 proposal을 처리하기 전에 트랜잭션 로그에 있는 모든 메시지들이 propose되고 quorum에 의해 commit되도록 한다.

새로운 follower 노드가 추가되었을 때는 leader는 우선 follower가 보지 못한 proposal을 모두 queueing하고 그 다음 마지막으로 commit된 proposal까지 commit들을 queueing한 후에 새로운 follower노드를 broadcast 리스트에 등록한다.

skip된 메시지를 보존하지 않는 것은 epoch을 이용한다. Zookeeper의 zxid는 64bit 숫자인데, lower 32bit은새로운 proposal을 생성할 때마다 증가하는 counter이고, high order 32bit은 epoch에 해당한다. 새로운 leader가 선출될 때마다 로그 상에 가지고 있는 가장 높은 zxid의 epoch에 1을 더하고, counter를 0으로 리셋한 zxid를 이후의 proposal을 위해 사용한다.

epoch을 통해서 여러 리더들이 동일한 zxid를 사용하는 것을 방지할 수 있을 뿐만 아니라, 더욱 중요하게는, 한번 실패했던 리더가 되살아나더라도 zxid가 낮기 때문에 리더가 될 수 없고, 이 노드가 follower가 되었을 때 leader는 기존 epoch의 proposal을 truncate하도록 해주기 때문에, recovery 과정이 단순해지고 빨라지는 장점을 가지고 있다.

Closing

Raft 등에서도 채용된 여러가지 개념들을 볼 수 있었다는 점이 재미있었다. two-phase commit-like 프로토콜이나 quorum이 유지되는 한 leadership의 유지, epoch (term)의 개념 등. 또한 다른 consensus 프로토콜들을 통해 어렴풋이 추측하고 있던 Zookeeper의 프로토콜에 대해 조금 더 정확하게 이해할 수 있게된 것 같다. 사실 Zab 이외에도 여러가지 알고리즘이나 최적화 등이 Zookeeper에 활용되었으리라 추측하는데, 위에서 언급한 Zookeeper에 관련한 다른 페이퍼들이나 Zookeeper의 코드를 읽어보아도 재미있을 것 같다.

Paper: Conflict-free Replicated Data Types

Shapiro, Marc, et al. “Conflict-free replicated data types.” Stabilization, Safety, and Security of Distributed Systems. Springer Berlin Heidelberg, 2011. 386-400.

Distributed system에서 replica 사이의 consistency를 유지하는 것은 매우 어려운 문제지만, CRDT라고 불리는 특별한 데이터구조들은 semilattice나 commutativity와 같은 그 자체의 성질을 이용하여 consistency의 문제를 훨씬 단순한 문제로 만들어준다. CRDT에 해당하는 데이터구조나 수학적인 성질들은 오래전부터 알려져있었고 활용되어왔지만, 이 페이퍼의 저자들이 2007년에 처음으로 CRDT라는 이름을 붙였다. 2009년에는 Treedoc이라는 온라인 협업 에디팅을 위한 데이터구조를 제안하면서 역시 CRDT의 한가지 예로서 제시한다. 하지만 이 때까지도 CRDT의 정의는 commutativity를 이용한다 정도로 매우 모호한 상태였지만, 2011년에 출판된 이 페이퍼는 CRDT의 성질을 수학적으로 정의하고 증명한 것에 큰 의미가 있는 것으로 보인다. 이 페이퍼와 같은 해에 출판된 “A comprehensive study of convergent and commutative replicated data types.”란 페이퍼에서는 이 페이퍼의 개념들을 더욱 자세하게 설명할 뿐만 아니라 알려진 CRDT들의 카탈로그를 제시하고 자세하게 설명하고 있다. 이 페이퍼는 오히려 그 페이퍼의 정수만을 간추려놓은 논문으로 보인다.

Strong Eventual Consistency

Eventual consistent 시스템의 경우 필연적으로 conflict resolution이 필요하게 되는데, 이러한 conflict resolution의 부담을 덜어주는 메커니즘으로서 Version vector나 Dotted version vector 등을 사용한다. CRDT는 근본적으로 conflict가 발생하지 않는 데이터 구조이며, 따라서 causality tracking을 필요로 하지 않을 뿐더러 conflict resolution을 위한 coordination 등을 필요로 하지 않는다. 이러한 특성을 Strong Eventual Consistency라고 이야기 하고 있다.

We propose a simple, theoretically-sound approach to eventual consistency. Our system model, Strong Eventual Consistency or SEC, avoids the complexity of conflict resolution and of roll-back. Conflict-freedom ensures safety and liveness despite any number of failures. It leverages simple mathematical properties that ensure absence of conflict, i.e., monotonicity in a semi-lattice and/or commutativity. […] In our conflict-free replicated data types (CRDTs), an update does not require synchronisation, and CRDT replicas provably converge to a correct common state. CRDTs remain responsive, available and scalable despite high network latency, faults, or disconnection.

우선, Eventual Consistency를 어떤 replica의 causal history에 포함된 method는 결국에는 (eventually) 다른 replica의 causal history에 포함되는 것으로 정의하고 있다. 그리고 Convergence의 특성을 추가적으로 실행되는 method가 없어서 서로 다른 replica의 causal history가 동일하게 유지되는 상황에서는 결국에는 두 replica의 상태가 동등해지는 것으로 정의하고 있다. □(Globally)나 ◊(Finally)와 같은 notation은 처음으로 본 것인데 Temporal Logic에서 사용되는 notation인 것 같다.

Definition 2.2 (Eventual Consistency (EC)). Eventual delivery: An update delivered at
some correct replica is eventually delivered to all correct replicas: ∀i, j : fci ⇒ ◊f
cj .
Convergence: Correct replicas that have delivered the same updates eventually reach equivalent
state: ∀i, j : □ci = cj ⇒ ◊□sisj .
Termination: All method executions terminate.

Strong Eventual Consistency는 여기에 더해서 서로 다른 replica의 causal history가 동일하다면 동등한 상태를 가지는 특성으로 정의하고 있다. 이 paper에서는 한 replica의 causal history를 실행된 method들의 집합으로 정의하고 있으므로 – 그 method들에 대한 순서나 인과성 등의 정보는 포함하지 않고 있으므로 causal history라고 부르는 것은 조금 이상한 것 같다 – 정확히는 실행된 method들의 집합이 동일할 경우라고 보면 될 것 같다.

Definition 2.3 (Strong eventual consistency (SEC)). An object is Strongly Eventually Consistent if it is Eventually Consistent and:
Strong Convergence: Correct replicas that have delivered the same updates have equivalent state: ∀i, j : ci = cj ⇒ si ≡ sj .

State-based CRDT (CvRDT) and Operation-based CRDT (CmRDT)

CRDT를 2가지로 분류하고 있다. 첫번째는 상태들을 merge하는 method를 기반으로 정의하는, Replica들의 상태가 converge할 수 있는 형태의 CRDT인 Convergent Replicated Data Type (CvRDT)이고, 두번째는 operation들이 commutative한 성질을 기반으로 정의하는 CRDT인 Commutative Replicated Data Type (CmRDT)이다. Paper에서는 두 가지의 CRDT는 서로를 emulation할 수 있으므로 equivalent하다고 이야기 하고 있다.

우선 CvRDT의 정의를 이해하기 위해서는 lattice와 semilattice라는 수학 개념에 대해서 이해를 해야한다. Lattice란 기본적으로 어떤 ordering에 대해 정의되는 부분순서집합이며, 임의의 두 원소에 대해서 유일한 join과 meet가 그 집합 내에 존재해야한다. 여기서, join과 meet는 least upper bound와 greatest lower bound라고도 불리며, 우리에게 익숙한 개념으로 보자면, 최소공배수와 최소공약수에 해당하는 개념이다. 이 때, 최소공배수와 최소공약수가 각각 join과 meet가 되는 lattice는 배수를 ordering으로하는 정수집합에 해당한다. 정수집합의 모든 두 원소가 서로 배수관계가 있는 것은 아니지만 – 즉 부분순서집합 – 모든 두원소는 최소공배수와 최소공약수를 가진다. Semilattice란 join 또는 meet 중 하나를 가지는 부분순서집합이고, 어느 쪽이냐에 따라서 join-semilattice와 meet-semilattice로 불린다.

CvRDT에서 정의되는 객체는 객체들의 상태 집합에 대한 join-semilattice이고, merge method는 join 즉 least upper bound로 정의된다. 다시 말하면, 서로 다른 두 replica의 어떤 두 상태가 주어지더라도 이 상태를 merge한 상태가 정의되어있다고 할 수 있다. 이러한 객체가 SEC를 만족하는 것에 대한 증명은 어느 한쪽의…

Definition 2.4 (Monotonic semilattice object). A state-based object, equipped with partial order ≤, noted (S,≤, s0, q, u,m), that has the following properties, is called a monotonic semi-lattice: (i) Set S of payload values forms a semilattice ordered by ≤. (ii) Merging states with remote state s′ computes the LUB of the two states, i.e., s •m(s′) = s⊔s′. (iii) State is monotonically non-decreasing across updates, i.e., s ≤ s • u.
Theorem 2.1 (Convergent Replicated Data Type (CvRDT)). Assuming eventual delivery and termination, any state-based object that satisfies the monotonic semilattice property is SEC.

CmRDT는 말그대로 update method의 commutativity에 기초해서 상대적으로 단순하게 정의되고 있는데, 중요한 것은 concurrent update에 대해서만 commutativity 특성을 요구하고 다른 모든 update는 causal하게 전달되는 것을 가정하고 있다. 현실적으로는 어떤 경우에 어떤 update가 commutative하게 일어나고 어떤 경우에는 그렇지 않은 것은 쉽지 않기 때문에 이러한 제약을 주는 이유에 대해서 조금 의문이 들고, 특히 엄밀하게 정의되어있지 않은 CvRDT와 CmRDT의 equivalence에 대해서도 의문이 들게 만드는 부분이다.

Definition 2.6 (Commutativity). Updates (t, u) and (t′, u′) commute, iff for any reachable replica state s where both u and u′ are enabled, u (resp. u′) remains enabled in state s • u′ (resp. s • u), and s • u • u′ ≡ s • u′ • u.
Theorem 2.2 (Commutative Replicated Data Type (CmRDT)). Assuming causal delivery of updates and method termination, any op-based object that satisfies the commutativity property for all concurrent updates, and whose delivery precondition is satisfied by causal delivery, is SEC.

Example CRDTs

먼저 increment-only integer counter (G-Counter)를 위한 CRDT를 제시하고 있는데, 여러 replica는 integer vector의 각 replica에 해당하는 component를 increment하고, 두 vector의 merge operation은 각 component의 max를 취한 vector를 돌려주는 것으로 정의하고 있다. value method는 모든 component의 합에 해당하는 값을 돌려주게 된다.

increment와 decrement 둘다 가능한 integer counter (PN-Counter)를 위해서는 위에서 정의한 increment-only counter 2개를 결합하여, 하나는 increment를 기록하고 다른 하나는 decrement를 기록해서 실제 value method는 둘 사이의 차에 해당하는 값을 돌려주는 CRDT를 제시하고 있다.

여기서 설명한 G-Counter, PN-Counter를 포함해 G-Set, 2P-Set 등으로 이름지어진 여러 CRDT, 그리고 Directed Graph의 operation-based CRDT 구현 등에 대해 간략하게 설명하고 있는데, 위에서 언급한 “A comprehensive study of convergent and commutative replicated data types.”에서 훨씬 자세한 설명을 하고 있으므로 이 글에서는 생략하도록 한다. 이에 대해서 알고자 한다면 다음 페이지를 참고하면 좋을 것이라고 생각한다.

  • https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type
  • https://vaughnvernon.co/?p=1012
  • http://blog.plasmaconduit.com/crdts-distributed-semilattices/
  • https://github.com/aphyr/meangirls

Closing

  • CRDT가 distributed system의 여러가지 도구들 – vector clock, log, … – 과 commutativity라는 관점에서 유사성을 공유하고 있는 것을 고려하면, CRDT의 정식화나 실제 CRDT들에 대해서 생각해보는 것은 좋은 도구가 되며 distributed system을 다루기 위한 사고 훈련이 되는 것 같다.
  • CRDT가 모든 문제를 해결해줄 수는 없으나 이미 알려진 CRDT들을 이용해서도 애플리케이션 관점에서 의미있는 이익을 얻을 수 있는 것으로 보인다. Garbage collection과 같은 실제적인 이슈들이 있으나 해결이 불가능한 것은 아니고, Riak 2.0 등에서 CRDT를 도입한 것을 볼 때 실제적인 응용을 시도해 볼 가치가 있다고 생각한다.
  • 이 논문의 저자들의 2007년, 2009년에 출판한 논문들은 CRDT라는 이름만 붙어있을 뿐이지 수학적인 정식화 등은 전혀들어있지 않다. 하지만, 이름을 붙이고 수년동안 연구를 하면서 완벽하지는 않지만 어느 정도의 엄밀함을 가진 모델을 성립시키고 하나의 분야로서 다루어질 수 있게 된 것 같다. 물론 이것만 하더라도 대단한 것이지만, A급 컨퍼런스의 A급 논문들만 보다보면 처음부터 거추장스러운 것은 하나도 없고, 논리적으로 완벽하며 또한 해당 분야에서도 중요한 의미를 지니고 있는 세상에 없을만한 것으로 보이곤 한다. 이 둘을 보면서, 평범한 과학자나 엔지니어들의 현실은 오히려 이러한 A급 논문보다 이들 논문들과 더 가깝지 않을까라는 생각이 조금 들었다.

Paper: Facebook Wormhole

Sharma, Yogeshwer, et al. “Wormhole: Reliable pub-sub to support geo-replicated internet services.” NSDI, May. 2015.

Challenges to Adopting Stronger Consistency at Scale 논문을 소개할 때도 언급한 바가 있는 이 논문은 Wormhole이라는 페이스북의 스토리지 업데이트를 여러 애플리케이션에 전달하기 위한 pub-sub 시스템을 소개하고 있다.

페이스북에서 사용자가 포스트를 올리면 데이터베이스의 쓰기가 발생하게 되는데, 이러한 쓰기 오퍼레이션은 다른 사용자의 뉴스 피드를 갱신한다거나, 캐시 무효화, 인덱스 등의 내부 시스템에 사용할 필요가 있다. 이를 위한 한가지 접근은 데이터베이스의 쓰기 오퍼레이션을 전부 메시지 큐로 전달하고 이를 여러 consumer에게 전달하는 방식인데, 페이스북 웜홀은 페이스북의 기존의 스토리지 (MySQL, RocksDB, HDFS)의 데이터가 엄청나기 때문에, 이를 별도의 메시지큐에 복제하는 것을 피하고 접근을 선택해야만 했다.

wormhole_architecture

Wormhole의 전체적인 구조는 Producer, Datastore, Publisher, Subscriber로 이루어지며, Facebook의 서비스에 해당하는 Producer가 역시 서비스를 위한 Datastore (MySQL, RocksDB, HDFS)를 업데이트하면, 일반적으로 Datastore와 동일한 서버에서 동작하는 Publisher가 Datastore의 WAL을 읽어서 이를 ‘Wormhole update’라는 공통적인 포맷 (키-밸류로 이루어진 해시 + 메타데이터)으로 만들고, 이를 Subscriber들에 전달하는 방식이다.

Datastore가 sharding 되어있기 때문에, Wormhole update에는 shard에 대한 정보가 추가되어 전달된다. 아마도 이를 이용해서 Replication 등에 활용할 수 있으리라 생각된다. 하나의 Subscriber 애플리케이션도 당연히 여러 Subscriber로 sharding될 수 있는데, 하나의 Datastore shard로부터의 Flow는 하나의 Subscriber로만 전달된다. (Replication 등을 위해) 하나의 shard로부터의 순서를 보장하기 위해서는 이 요건은 필수적인 것으로 보인다. 이러한 방식 때문에 Subscriber들 사이의 조율에 대한 필요성이 줄어들기도 할텐데, 이러한 부분에서는 Kafka가 떠오르기도 한다.

Publisher는 ZooKeeper를 이용해서 Subscriber들을 관리하며, Subscriber가 마지막으로 읽고나서 acknowledgement를 보낸 로그상의 위치를 기록한다. Subscriber는 읽고 있는 위치를 특별히 상태로서 관리하지 않아도 된다. 이러한 면에서, Kafka의 경우 Consumer가 읽어야 하는 위치를 상태로서 유지하고 broker로부터 pull하는 방식 임에 반해, Wormhole은 Publisher가 Subscriber로 push하는 정반대의 방식이라고 볼 수 있다.

Kafka와 같은 시스템에서는 일정 기간 동안의 버퍼를 가지고 있기 때문에 하나의 Consumer가 지연되더라도 단순히 이전의 offset으로 pull하는 방식으로 해결하는데, Wormhole은 내부적으로 이러한 버퍼에 해당하는 스토리지를 따로 가지고 있는 것이 아니기 때문에, Publisher는 각 Subscriber의 Flow를 별도로 관리해서 서로 다른 WAL의 위치를 읽어들일 수 있는 기능을 제공한다. 이러한 기능 덕분에 하나의 Subscriber가 느려진다고 해서 다른 Subscriber가 반드시 느려지지 않을 수 있게 된다.
여기서 문제는 서로 다른 n개의 Subscriber가 Datastore의 서로 다른 위치를 읽어들이고 있다면 Datastore의 랜덤 디스크 I/O가 증가해서 Producer 즉, 서비스의 성능에도 영향을 주게 된다. Wormhole은 이러한 문제를 피하기 위해 서로 다른 속도를 가진 Subscriber들에 대해 항상 별도의 Flow가 할당되는 것이 아니라, Subscriber가 읽고 있는 위치나 속도에 따라서 Flow들을 적당히 묶어서 Caravan이라는 개념을 도입하고 있다. 이러한 수준의 최적화까지 갖추고 있는 것은 물론 페이스북의 실제적인 필요에 의한 것이겠지만, Wormhole이 상당히 성숙된 시스템임을 느끼게 한다.

Wormhole은 간단한 필터링 메커니즘을 가지고 있어서, 각각의 Subscriber 애플리케이션이 제공한 필터에 따라서 Publisher는 필터링된 업데이트만을 Subscriber에 전달한다. 필터링은 키-밸류들의 집합에 해당하는 Wormhole update에 대한 오퍼레이션들에 대한 AND, OR로 표현된다. 이 오퍼레이션들에는 키가 존재하는지, 어떤 키의 값이 특정 값인지, 어떤 키의 값이 특정 집합에 포함되는지, 이러한 오퍼레이션들의 역들이 있다. 이러한 간단한 필터링이 존재할 수 있는 것은 불투명한 데이터를 전달하는 것이 아니라 공통적인 데이터 포맷이 정의되어 때문이다. 또한, Subscriber 애플리케이션에 따라서 모든 데이터가 필요하지 않은 경우는 매우 쉽게 상상할 수 있으므로, 이러한 기능 역시 페이스북의 매우 실제적인 요구사항에 근거한 것으로 보인다.

wormhole_mcrd_architecture

쉽게 상상할 수 있듯이 Datastore는 보통 복제본을 가지고 있고, 어떤 Datastore 샤드가 실패했을 때, 해당 샤드에 대한 Wormhole의 설정을 직접 바꾸거나 마스터가 복구될 때까지 기다리는 것이 아니라, 자동적으로 복제본을 이용해 Subscriber로 데이터를 계속 전달할 수 있도록 하기 위한 메커니즘을 가지고 있는데, Wormhole 논문에서는 이를 Multiple-Copy Reliable Delivery (MCRD)라고 부르고 있다. 각 Publisher는 Zookeeper의 ephemeral node를 이용해서 fail-over에 대한 coordination을 한다.

Wormhole에서 MCRD가 매우 독특한 점은 복제본들 사이에서 WAL내의 동일한 레코드의 물리적인 위치는 서로 다르기 때문에, MCRD가 아닌 경우와 달리 물리적인 위치 정보를 기록해두어도 fail-over 시에는 아무런 의미가 없다는 점이다. 따라서, MCRD 모드에서는 논리적인 위치 – 단조증가하는 sequence number나 timestamp를 ZooKeeper에 저장해둔다. 여기서 어려운 이슈는 실제로 DataStore의 종류에 따라 이러한 논리적인 위치를 정의하고, 이러한 논리적인 위치가 주어졌을 때, 이를 물리적인 위치로 변환하는 일인데, 이를 위해 DataStore에 따라 logical positions mapper가 제공된다.

Wormhole의 운영적인 측면에서 재미있는 사실 하나를 언급하고 있는데, 1%의 Datastore에 장애가 발생해서 Wormhole publisher들이 제대로 동작하지 않게 되면, 1%의 데이터가 제대로 전달되지 않아 1%의 stale한 캐시가 발생하게 될텐데, 좀 더 자세히 들여다보면 100% 사용자의 1% 데이터가 stale하게 되는 것이 아니고, 1% 사용자의 100% 데이터가 stale하게 되는 것이기 때문에, Publisher의 신뢰성이 중요하다라고 강조를 하고 있다. 이러한 설명으로 미루어볼 때 Facebook의 대부분의 Datastore는 사용자별로 sharding이 되어있는 것을 알 수 있다. 🙂

또 하나 재미있는 점은 분산된 deployment 시스템을 이용한다는 점이다. Datastore는 장비가 추가되거나 빠지는 경우가 흔하기 때문에, Wormhole Publisher도 이에 따른 관리를 해주기 위해서 초기에는 중앙 집중적인 관리 시스템을 사용했다고 한다. 하지만, 이러한 시스템 하에서는 아주 많은 장비들에 대해 관리를 할 때 실수가 발생할 가능성이 높았기 때문에, Wormhole monitor라는 가벼운 프로그램을 동작시키고, 이 프로그램이 주기적으로 설정을 검사해서 Publisher를 실행할지 말지, 그리고 어떤 설정으로 Publisher를 실행할지 등을 정하는 분산 관리 시스템으로 바꾸었고, 훨씬 더 신뢰성있고 사용하기 쉬워졌다고 한다.

Wormhole은 페이스북과 같은 대규모 인터넷 서비스를 위한 아키텍쳐에서 스토리지에 대한 요구사항을 특별히 늘리지 않으면서도 스토리지에 저장되는 데이터를 소비해야하는 다양한 애플리케이션들에게 매우 효율적이고 신뢰성 있게 데이터를 전달해주는 시스템으로 보인다. 보다 다양한 Datastore에 대한 지원 등이 쉽지는 않을 수도 있다고 생각하지만, 페이스북 내에서는 충분한 수준이고, Wormhole의 디자인에서 엿볼 수 있는 여러가지 실용적인 선택들은 다른 시스템에도 여러가지로 적용해볼만 하다고 생각한다.

Paper: Paxos Made Live – An Engineering Perspective

Chandra, Tushar D., Robert Griesemer, and Joshua Redstone. “Paxos made live: an engineering perspective.” Proceedings of the twenty-sixth annual ACM symposium on Principles of distributed computing. ACM, 2007.

이 논문은 Paxos를 실제로 구현하고자 할 때 고려해야할 현실적인 문제들과 해결방식을 설명하고 있다. Google은 Chubby에서 필요로하는 분산 로그 스토리지를 위해 기존의 상용 솔루션을 대체하는 Paxos를 구현하게 되었다고 한다.

single_chubby_system

Paxos를 이용한 분산 로그 스토리지의 기본적인 아이디어는 데이터베이스 오퍼레이션에 대해서 Paxos 알고리즘을 반복적으로 적용하면, 복제본들 사이에 데이터베이스 오퍼레이션들의 로그를 동일하게 쌓아올릴 수 있고, 결과적으로 분산 로그 스토리지를 구현할 수 있게 된다. Chubby가 기반하고 있는 분산 DB는 Paxos로 구현된 분산 로그 스토리지에 저장된 데이터오퍼레이션들의 리플레이 로그와, 스냅샷을 관리한다. 흥미로운 점은 분산 로그 스토리지는 다른 분산 시스템을 만들 때에도 강력한 기초가 될 수 있기 때문에, 분산 로그와 분산 DB 레이어 사이를 깔끔한 인터페이스로 정의해서 분산 로그를 재사용할 수 있도록 모듈화를 꾀한 점이다.

이 논문에서는 Paxos 알고리즘에 대한 괜찮은 설명이 나오는데, Multi-Paxos에 대한 언급도 나온다. 그동안 Paxos 알고리즘의 최적화에 해당하는 Multi-Paxos라는 단어를 도입한 것이 이 논문이라고 생각하고 있었는데, 이번에 블로그 글을 정리하다가 다시 찾아보니, 애초에 Paxos의 오리지널 논문인 Lamport의 “Part-time Parliament”에서 Multi-decree parliament라는 알고리즘으로 소개되고, 이를 Prisco, Lampson, Lynch의 “Revisiting the Paxos algorithm”에서 처음으로 “Multi-Paxos”라는 이름을 사용한 것으로 보인다.

Handling disk corruption

디스크 손상에 의한 두가지 상황, 즉 파일의 내용이 바뀌었거나 파일 자체가 없어지는 경우에 대한 탐지 방법을 제시하고 있다. 전자는 각 파일에 대한 checksum을 유지하는 방법으로 해결하고, 후자는 리플리카였음을 나타내는 marker를 GFS에 보관하는 방법으로 디스크 손상을 탐지한다. (이러한 방법과 관련해 역시 유명한 Viewstamped Replication 논문을 인용하고 있으므로 나중에 꼭 읽어보도록 하자.)

디스크 손상이 탐지되었을 경우에는 리빌드를 진행하게 되는데, non-voting member 즉, promise나 acknowledgement 메시지에는 응답하지 않는 노드로서 Paxos에 참가하고 catch-up mechanism을 사용한다. catch-up mechanism 자체에 대해서는 이 논문에는 그리 자세한 설명은 없다.

이렇게 디스크 손상에 대비하는 메커니즘이 있다면 모든 쓰기를 디스크에 바로 flush하지 않아도 되는 최적화가 가능해진다.

Master leases

stale data를 읽지 않도록 보장하기 위해서는, 읽기 오퍼레이션 조차도 Paxos로 실행해서 다른 업데이트들과 직렬화되도록 하는 것이 필요하다. 일반적으로 읽기 오퍼레이션이 대부분의 오퍼레이션을 차지하므로 이는 매우 비효율적이다.

이에 대한 해결책으로 제시하는 것이 마스터 리스이고, 이는 마스터가 리스를 가지고 있는 동안에는 다른 복제본이 Paxos로 업데이트를 진행할 수 없는 것을 보장하기 때문에, 읽기 오퍼레이션을 로컬에서 제공할 수 있게 된다. 그리고, 리스가 만료되기 전에 보통 갱신되기 때문에 마스터가 대체로 항상 리스를 가지고 있게 된다.

Epoch numbers

어떤 복제본으로 업데이트 요청이 왔을 때 마스터의 지위를 잃어버리거나 다시 얻었다면, 그 요청은 중단되어야 하는데, 이를 구분하기 위해 어떤 마스터가 연속적으로 재임하는 동안에는 같은 값으로 유지되는 Epoch number라는 개념을 도입하고 있다. 아마도 Raft 알고리즘의 term과 동일한 개념이 아닐까 싶다.

Group membership

그룹 멤버쉽 문제란 복제본이 추가되거나 제거되는 등 복제본의 집합에 일어난 변화를 어떻게 다루는지에 대한 문제다. 그룹 멤버쉽 문제에도 Paxos를 활용해 해결할 수가 있다는 아이디어를 인용하고 있는데, 아이디어 자체는 단순하나 – 복제본 집합의 변화의 로그를 Paxos로 유지하면 될 것이다 – 디스크 손상 등의 문제들을 고려해서 구현하는 것은 쉽지 않다고 얘기하고 있다.

Snapshots

로그를 무한정 쌓을 수 없으므로 스냅샷을 생성해서 기존의 로그가 필요하지 않게 만드는 메커니즘을 필요로 한다. Paxos 자체는 복제된 로그의 일관성에만 관심이 있지, 복제되고 있는 데이터 구조 자체에 대해서는 인지하지 못하므로, 스냅샷을 생성하는 것은 애플리케이션의 책임이 된다. 따라서, 애플리케이션은 자유롭게 스냅샷을 생성하고, 스냅샷이 생성된 것에 대해서 Paxos 프레임워크 쪽에 알려주어, Paxos 프레임워크가 스냅샷 이전의 로그를 삭제할 수 있도록 한다. 스냅샷은 복제본 사이에 동기화되지 않으며, 각각의 복제본은 독립적으로 스냅샷을 생성할 시점을 결정한다.

이러한 메커니즘은 간단해보이지만, 로그와 스냅샷이 서로 일관성있게 유지되도록 하기 위한 복잡성들이 존재한다.

  • 각각의 스냅샷에 관련된 모든 Paxos 관련 정보 – 스냅샷을 생성하는 시점의 Paxos instance number와 그 시점의 그룹 멤버쉽 – 를 담고 있는 스냅샷 핸들이라는 개념이 존재한다. 스냅샷을 복구할 때는, 애플리케이션은 스냅샷 핸들을 Paxos 프레임워크 쪽에 전달하고, Paxos 프레임워크 이를 이용해 복구를 진행한다.
  • 클라이언트가 스냅샷을 생성하려고 할 때는 먼저 스냅샷 핸들을 요청한다. 클라이언트는 스냅샷을 생성하는데, 이 스냅샷은 스냅샷 핸들의 정보와 일관되어야 하므로 Paxos가 진행됨에 따라서 업데이트라 발생하는 것에 대해서 주의를 기울여야 한다. 마지막으로 스냅샷이 생성되었다면 클라이언트는 Paxos 프레임워크에 스냅샷 핸들을 전달하면서 스냅샷이 생성되었음을 알린다. 스냅샷의 생성에 실패했다면 단순히 Paxos 프레임워크에 이를 알리지 않는 것으로 충분하다.
  • catch-up을 위해서는 다른 복제본으로부터 최근의 스냅샷을 얻어오고, 나머지 로그 레코드들도 다른 복제본으로부터 얻어온다.

Database transactions

CAS (compare and swap) 오퍼레이션을 atomic하게 유지하기 위해서 CAS에 관련된 데이터를 Paxos 상에서의 하나의 값으로 취급한다. 실제로 데이터베이스 트랜잭션을 구현하지 않더라도, 이러한 방식을 확장함으로써 트랜잭션 스타일의 지원이 가능하다.
MultiOp은 이러한 트랜잭션 지원을 위해 atomic하게 적용되고 guard, top, fop의 세가지 부분으로 이루어져 있다. guard라고 불리는 테스트의 리스트가 한가지 부분이고, 이 모든 테스트가 참이라면 데이터베이스 오퍼레이션의 리스트인 top을 실행하고, 그렇지 않다면 역시 데이터베이스 오퍼레이션의 리스트인 fop을 실행한다.

Software Engineering

알고리즘의 표현, 실행시간 일관성 검증, 테스팅, 동시성 등에 관련한 이슈들을 설명하고 있는데, 특별히 흥미로운 테스팅 항목에 대해 살펴보자.

시스템에 랜덤한 실패들을 집어넣더라도 Safety와 Liveness를 만족하는지를 테스트하는데, 테스트가 실패했을 때 이를 재현할 수 있어야 하므로, 테스트가 시작할 때 설정된 시드를 확보해서 동일한 테스트를 재현할 수 있도록 하고 있다.

이외에도 Chubby 시스템이 하위 시스템의 실패에도 잘 동작하는지를 테스트하기 위해서, 분산 로그 시스템에 여러 hook을 구현하고 역시 랜덤하게 실패하도록 했을 때 Chubby 시스템이 그러한 실패들을 잘 극복하는지를 테스트 하기도 한다.

이러한 테스트 방식은 체계적인 방식은 아니나, 최근 분산시스템을 테스트하는 것에 대해서는 일반적인 방법으로 자리잡아 나가고 있는 것 같다.

Summary

이 논문의 마지막 부분은 컴파일러 개발 분야가 매우 복잡하지만, 이론이 널리 알려져있고 yacc, ANTLR 등의 도구와 같이 현실에도 잘 적용되어있는 점을 지적하면서, 분산시스템 분야는 그렇지 않음을 지적하고 있다.

  • There are significant gaps between the description of the Paxos algorithm and the needs of a real-world system. In order to build a real-world system, an expert needs to use numerous ideas scattered in the literature and make several relatively small protocol extensions. The cumulative effort will be substantial and the final system will be based on an unproven protocol.
  • The fault-tolerance computing community has not developed the tools to make it easy to implement their algorithms.
  • The fault-tolerance computing community has not paid enough attention to testing, a key ingredient for building fault-tolerant systems.

References

Paxos는 Paxos made simple을 통해서 공부했었는데, 시간이 날 때 오리지널 페이퍼인 Lamport, Leslie. "The part-time parliament." ACM Transactions on Computer Systems (TOCS) 16.2 (1998): 133-169.를 읽어보는 것이 좋을 듯 하다.

리스에 관해서는 Gray, Cary, and David Cheriton. Leases: An efficient fault-tolerant mechanism for distributed file cache consistency. Vol. 23. No. 5. ACM, 1989.을 참고하자.

그룹 멤버쉽 문제에 관해서 Cristian, Flaviu. "Reaching agreement on processor-group membership in synchronous distributed systems." Distributed Computing 4.4 (1991): 175-187.

Paper: Dotted version vectors: Logical clocks for optimistic replication (Part 2)

Paper: Dotted version vectors: Logical clocks for optimistic replication (Part 1)

A Kernel for Eventual Consistency

인과성을 이용하는 분산 스토리지의 동작에 있어서 논리적 시계 집합에 대한 sync 와 update 2개의 오퍼레이션이 핵심을 이루고 있다고 주장하고 있다.

먼저 sync 오퍼레이션의 경우에는 두 개의 시계 집합을 취해서 두 집합의 원소들인 논리적 시계들 사이에 인과성의 관계가 있다면 이전에 해당하는 시계를 모두 버리고, 남아있는 원소들로 구성된 집합을 반환하는 오퍼레이션이다. 결과적으로 반환되는 집합은 동시적 (concurrent)인 관계에 있는 시계들로만 이루어지게 된다.

sync는 클라이언트와 서버 도는 서버의 노드들 사이의 동기화가 필요한 시점에 논리적 시계에 기반해서 과거의 값들을 버리기 위한 오퍼레이션이라고 볼 수 있다. 여기서 재미있는 것은 sync는 논리적 시계가 실제로 어떻게 구현되어있는지에 상관없이 시계들 사이의 부분순서 (partial order)만을 이용해서 일반적으로 정의할 수 있다는 것이다.

sync_defined_by_partial_order

 

update 오퍼레이션은 어떤 시계 집합 (통상적으로 클라이언트)과 서버의 어떤 노드의 시계 집합, 서버의 식별자를 취하고 하나의 시계를 반환하는 오퍼레이션이다. 이 시계는 클라이언트 시계 집합 내의 모든 시계들을 dominate하고, 시스템 내의 시계들의 어떤 join에 의해서도 dominate되지 않아야 한다. (즉, dominate하거나 concurrent 해야한다.)

인과적인 이력 (causal histories)의 경우, update 오퍼레이션은 다음과 같이 정의할 수 있다. 시스템 전체에서 고유한 사건 식별자를 얻어서 클라이언트 시계 집합의 각 시계에 추가하는 방식이다.

update_operation_of_causal_histories

이어서 분산 스토리지의 get/put 오퍼레이션에서 위에서 정의한 sync/update 오퍼레이션을 이용해서 논리적 시계를 어떻게 다루는지에 대해서 설명하고 있으나 여기서는 생략하기로 하자.

Dotted Version Vectors

버전 벡터가 (id, m)의 형태로 표기된다면 dotted version vector는 (id, m, n)과 같이 표기할 수 있다. 버전 벡터가 연속적인 인과적인 이력을 표현하고 있다면, dotted version vector는 그 연속적인 이력에 n에 해당하는 독립적인 사건을 추가한 것을 표현할 수 있다.

dotted_version_vector_definition

 

예를 들어, {(a,2),(b,1),(c,3,7)}이라는 dotted version vector는 {a1, a2, b1, c1, c2, c3, c7}와 같은 인과적인 이력을 표현하고 있다.

dotted version vector를 인과적인 이력으로 정의했다면, 부분순서가 어떻게 정의되는 지를 살펴볼 때다.

dotted_version_vector_partial_order

 

 

dotted version vector를 인과적인 이력으로 변환해서 생각하면 당연한 결과라고 할 수 있다.

부분순서가 정의되어있으므로 sync 오퍼레이션은 추가적으로 정의할 필요가 없으나, update 오퍼레이션은 dotted version vector에 대해 정의할 필요가 있다.

dotted_version_vector_update_function

 

이 간결한 식에 매우 많은 의미를 담고 있는데, 합집합(union)의 좌항의 경우, 클라이언트의 시계집합(S)에 속하는 노드의 식별자들 중 파라미터에서 제공한 노드의 식별자 (r)가 아닌 것들에 대해서,  각각의 식별자와 클라이언트 시계집합에서 해당 식별자에 대한 가장 큰 sequence를 pair로 하는 버전 벡터들을 나타낸다. 우항의 경우에는 파라미터에서 제공한 노드의 식별자(r)에 대한 dotted version vector를 구성하고 있는데, 좌항의 경우와 유사하게 첫번째 정수는 클라이언트의 시계집합에서 해당 식별자에 대한 가장 큰 sequence가 된다. 양쪽 모두 클라이언트의 문맥을 표현하는 것이라고 볼 수 있다. 두번째 정수는 조금 특이한데, 파라미터에서 제공한 노드의 시계 집합에서 역시 해당 노드의 식별자에 대한 가장 큰 sequence를 얻은 후 1만큼 증가시켜준 값으로 설정된다. 이는 노드 상의 문맥에서 업데이트로 인한 새로운 사건을 기록한 것이라고 볼 수 있다.

이러한 update 오퍼레이션을 그대로 적용하면 다음과 같이 아름답게 움직이는 시스템이 된다.

dotted_version_vector_operations

 

논문에서는 dotted version vector에 대한 correctness에 대해서 설명하고 있는 듯 하나 생략하도록 한다. 이 논문을 여러번 다시 읽어보았지만 dotted version vector가 어떻게 문제를 해결하는지에 대한 직관적인 설명을 하는 것이 아직도 어려운 것 같다.

dotted_version_vector_reason_of_correctness

Related Work

  • Lamport clock
    • L. Lamport, “Time, clocks and the ordering of events in a distributed system,” Communications of the ACM, vol. 21, no. 7, pp. 558–565, Jul. 1978.
      • http://blog.lastmind.net/archives/720
  • Version vector
    • D. S. Parker, et al., “Detection of mutual inconsistency in distributed systems,” Transactions on Software Engineering, vol. 9, no. 3, pp. 240–246, 1983.
  • Vector clock
    • C. Fidge, “Timestamps in message-passing systems that preserve the partial ordering,” in 11th Australian Computer Science Conference, 1989, pp. 55–66.
      • http://blog.lastmind.net/archives/736
    • F. Mattern, “Virtual time and global clocks in distributed systems,” in Workshop on Parallel and Distributed Algorithms, 1989, pp. 215–226.
  • Dynamic creation and retirement of vector entries
    • R. A. Golding, “A weak-consistency architecture for distributed information services,” Computing Systems, vol. 5, pp. 5–4, 1992.
    • K. Petersen, et al., “Flexible update propagation for weakly consistent replication,” in Sixteen ACM Symposium on Operating Systems Principles, Saint Malo, France, Oct. 1997.
    • P. S. Almeida, et al., “Interval tree clocks,” in Proceedings of the 12th International
      Conference on Principles of Distributed Systems, ser. OPODIS ’08. Berlin, Heidelberg: Springer-Verlag, 2008, pp. 259–274.
  • Scalability problems
    • B. Charron-Bost, “Concerning the size of logical clocks in distributed systems,” Information Processing Letters, vol. 39, pp. 11–16, 1991.
    • D. H. Ratner, “Roam: A scalable replication system for mobile and distributed computing,” Ph.D. dissertation, 1998, uCLA-CSD-970044.
    • R. Prakash and M. Singhal, “Dependency sequences and hierarchical clocks: Efficient alternatives to vector clocks for mobile computing systems,” Wireless Networks, pp. 349–360, 1997, also presented in Mobicom96.
    • P. Mahajan, S. Setty, S. Lee, A. Clement, L. Alvisi, M. Dahlin, and M. Walfish, “Depot: Cloud storage with minimal trust,” in OSDI 2010, Oct. 2010.
    • D. Malkhi and D. B. Terry, “Concise version vectors in winfs,” in DISC, ser. Lecture Notes in Computer Science, P. Fraigniaud, Ed., vol. 3724. Springer, 2005, pp. 339–353.
    • V. Ramasubramanian, et al., “Cimbiosys: a platform for contentbased partial replication,” in Proceedings of the 6th USENIX symposium on Networked systems design and implementation. Berkeley, CA, USA: USENIX Association, 2009, pp. 261–276.
    • F. J. Torres-Rojas and M. Ahamad, “Plausible clocks: constant size logical clocks for distributed systems,” Distributed Computing, vol. 12, no. 4, pp. 179–196,
      1999.
  • Trade-off
    • B. B. Kang, R. Wilensky, and J. Kubiatowicz, “The hash history approach for reconciling mutual inconsistency,” in Proceedings of the 23nd International Conference
      on Distributed Computing Systems (ICDCS). IEEE Computer Society, 2003, pp. 670–677.

 

Paper: Challenges to Adopting Stronger Consistency at Scale

AJOUX, Phillipe, et al. Challenges to Adopting Stronger Consistency at Scale. In: 15th Workshop on Hot Topics in Operating Systems (HotOS XV). USENIX Association. (pdf, slides)

최근 geo-replicated data store에서 높은 consistency를 제공하기 위한 연구들이 많이 있다고 한다. – 이 논문에서 나열하고 있는 그런 논문들의 목록도 흥미로운데 이는 마지막에 한번 정리해보자. – Facebook에서도 사용자들에게 – 특히, 읽기에 대한 – end-to-end consistency를 제공하기 위해서 이러한 연구 결과들을 도입하고 싶으나, 여러가지 이유로 이것이 쉽지가 않은데, 그 이유가 무엇인지 일단 알아보자…라는 것이 이 논문의 취지.

There have been many recent advances in distributed systems that provide stronger semantics for geo-replicated data stores like those underlying Facebook. These research systems provide a range of consistency models and transactional abilities while demonstrating good performance and scalability on experimental workloads. At Facebook we are excited by these lines of research, but fundamental and operational challenges currently make it infeasible to incorporate these advances into deployed systems. This paper describes some of these challenges with the hope that future advances will address them.

Facebook은 확장성과 효율을 위해서 샤딩, 복제, 캐싱 등을 아주 많이 활용하고 있는 것으로 보인다. Sodcial graph 시스템을 예로 들고 있는데, 하나의 node나 edge가 MySQL(!)에 쓰여지면, 이것이 비동기적으로 다른 데이터 센터로 복제되고, 여러 계층의 캐시들을 업데이트하고, pub-sub 시스템에 의해 검색이나 뉴스피드를 위한 다른 시스템으로 전달된다. 당연한 이야기지만, Facebook 내의 여러 기능에 해당하는 시스템들은 효율을 위해 각자 특별한 샤딩 방식이나 캐시, 인덱스들을 유지해야한다. Twitter (http://blog.lastmind.net/archives/664)나 LinkedIn (http://blog.lastmind.net/archives/657)의 시스템과 같이 Primary Data Store (DB)로부터 비동기적인 복제를 통해 Secondary Index를 생성하거나 다른 서비스로 이벤트를 전달하는 방식은 현재의 대규모 인터넷 서비스에서는 매우 공통적인 부분으로 보인다.

Facebook relies on sharding, data replication, and caching to efficiently operate at scale. When a node or edge is added to the social graph, it is first written to a single MySQL instance. From there it is asynchronously replicated to all of our data centers, updated or invalidated in multiple layers of cache, and delivered through a scalable publisher-subscriber system to other services such as Search and News Feed. No single data placement strategy can efficiently serve all workloads in a heavily sharded system, so many of these services choose a specialized sharding function and maintain their own data store, caches, and indexes.

facebook_data_layout

Fundamental Challenges

Integrating Across Stateful Services

Facebook의 Social graph는 여러 서비스들을 위한 시스템에 각 서비스를 위한 형태로 복제되고 있다고 한다. 문제는, 대부분의 연구 결과들에서 나타나는 설계들은 단일한 서비스를 상정하고 있다는 것이다. Facebook는 그러한 여러한 서비스가 따로 사용자에게 제공되기 보다는 하나의 서비스처럼 제공되므로 여러가지 문제들이 발생하게 된다.

Facebook’s architecture of cooperating heterogeneous services is different from the monolithic service that
most research designs assume. This difference alone is not fundamental—Facebook can be externally viewed as a single service even though internally it is comprised of many services.

Storing Data Consistently Across Services

Facebook의 각 서비스들은 Social graph의 일부에 대한 캐시, 인덱스, 복제본 등을 가지고 있다. Wormhole이라는 Facebook의 pub-sub 시스템은 업데이트를 비동기적으로 여러서비스들에 전달해주고 있다.

In a sharded and scaled environment like Facebook’s, services may maintain their own optimized cache, index, or copy of a portion of the social graph. Wormhole [40] is our scalable pub-sub system that asynchronously delivers update notifications to all of the services that manage state.

이 때, 만약 높은 consistency를 제공하기 위해서는 동기적으로 업데이트 통지를 전달할텐데, availability와 latency를 떨어뜨리는 요인이 된다. 인과성을 위한 시스템을 사용하더라도 여러 서비스들 사이의 의존관계를 표현하는 것은 굉장한 복잡도를 가져오게 된다.

To provide strong consistency across our services, we would need to propagate update notifications synchronously impacting availability and latency. A causal system could avoid atomic updates, but would still need to be integrated into every service and threaded through every communication channel greatly increasing the complexity of the dependency graph.

Decentralized Access to Services

클라이언트는 한 화면에서 표시되는 여러 컨텐츠를 별도의 리퀘스트를 통해 가져와서 클라이언트에서 통합한다. 이 때문에 클라이언트는 일관되지 않은 결과들을 가지고 session-based guarantee를 보장함으로써 사용자에게 일관된 상태를 보여주어야 한다.

The Facebook web site and its mobile applications use parallelism and pipelining to reduce latency. The notification bar and the central content are fetched by separate requests, for example, and then merged at the client. Mobile applications also aggressively cache results using local storage, and then merge query results with previously seen values. […] but it leaves the user’s device as the only safe location to define session-based guarantees or demarcate isolation boundaries.

단일한 사용자는 보통 동일한 지역의 동일한 클러스터로 라우팅 되므로 이러한 성질은 사용자가 inconsistency를 경험할 확률을 낮추어준다. 하지만, 그러한 성질이 보장되는 것은 아니므로 역시 문제는 있다.

Requests for a single user are usually routed to the same cluster in the same region. This routing stability assists in reducing the number of user-visible inconsistencies, because most of the variance in asynchronous update propagation is inter-region.

Composing Results

조금 설명이 애매하기는 한데, 여러 서비스로부터의 결과를 조합해야할 경우, 그 사이의 의존관계를 다루어야 하는데, 각 서비스는 각자의 서비스를 위한 일부의 결과를 가지고 있기 때문에, 이를 일반적으로 처리하기가 쉽지 않다는 의미인 듯 하다.

Each service can be thought of as implementing its own limited query language, so it is not clear that the dependencies can be captured in a way that is both sufficiently generic and sufficiently precise.

Query Amplification

사용자 입장에서는 하나의 쿼리지만, 내부적으로는 여러 서비스들에 대한 수천개의 쿼리로 실행될 수 있다는 이야기.

For example, a user’s home page queries News Feed to get a list of stories, which depends on a separate service to ensure story order of previously seen content. These stories are then fetched from TAO to get the content, list of comments, likes, etc. In practice, this behavior produces fork/join query patterns that have fan-out degrees in the hundreds and critical paths dozens deep.

Slowdown Cascades

동일한 데이터가 각각의 서비스에서는 서로 다른 샤딩 방식과 인덱싱을 통해 저장되므로, 서로 다른 서비스들의 샤드 사이에 all-to-all ordering 의존 관계가 발생하고, 높은 consistency를 요구하는 시스템에서는 하나의 서비스에서 하나의 샤드만 실패 또는 지연되더라도 이에 의존하는 모든 다른 서비스의 모든 샤드에 영향을 줄 수 있다는 이야기.

Different services use different sharding and indexing schemes for the same data. For instance, TAO shards data based on a 64-bit integer id, News Feed shards based on a user id, and the secondary index system shards based on values. The use of different sharding schemes creates all-to-all ordering dependencies between shards in different services. This connectedness accelerates slowdown propagation in strongly consistent systems. A failure in one shard of one service would propagate to all shards of a downstream service.

Latency Outliers

여러 쿼리들을 조합해야하는 경우, 하나의 쿼리만 느리더라도 최종적인 응답에 영향을 미칠 수 있음. 이것이 strong consistency와 직접적인 연관성이 있는지는 명확하게 이해하기 어려우나, strongly consistent system의 응답속도나 outlier들이 그렇지 않은 시스템들에 비해 더 높기 때문에 언급되는 것 같음.

Parallel subqueries amplify the impact of latency outliers. A user request does not return until it has joined the results of all of its queries, and thus must wait for the slowest of its internal requests.

Linchpin Objects

데이터의 개수도 많고 매우 자주 읽히는데다 쓰기도 많은 무언가를 Linchpin Object라고 부르고 있다. 흔히 Facebook의 연예인 페이지나 유명 대기업의 페이지를 상상하면 될 것 같다. 역시 좋은 성능을 제공하는 것이 strongly consistent system에서는 중요하다고 얘기하고 있다.

The most frequently read objects are often also frequently written to and highly connected. Examples of these linchpin objects include popular users like celebrities, popular pages like major brands, and popular locations like tourist attractions. […] A system that provides stronger consistency must provide good performance for these linchpin objects.

Net Benefit to Users

stronger consistency는 프로그래머가 프로그램의 정확성에 대해 생각하기 편리하고, 사용자에게 일관성을 제공한다는 측면에서 이점을 가지고 있지만, 이를 정량적으로 측정하기는 어려울 뿐만 아니라, 그러한 이점들이 단점들 – 커뮤니케이션 오버헤드, 무거운 상태관리, 응답속도의 증가 – 을 넘어서지 않을지도 모름.

Systems with strong consistency are easier for programmers to reason about and build on top of [6, 12] and they provide a better experience for users because they avoid some anomalous application behavior. However, these benefits are hard to quantify precisely, and they do not appear in isolation. When all aspects of user experience are considered it might even be the case that the benefit does not outweigh the detriment. […] Stronger properties are provided through added communication and heavier-weight state management mechanisms, increasing latency. Although optimizations may minimize these overheads during failure-free execution, failures are frequent at scale. Higher latency may lead to a worse user experience, potentially resulting in a net detriment.

Operational Challenges

Fully Characterizing Worst-Case Throughput

여러 서비스가 얽혀있는 복잡한 시스템에서 실패나 복구 방법에 대해 미리 예측하거나 관찰하기 쉽지 않을 뿐더러 이를 정형화하기도 쉽지 않다는 얘기.

While there is no theoretical impediment to preserving worst-case throughput despite strengthening Facebook’s consistency guarantees, our experience is that failure and recovery behavior of a complex system is very difficult to characterize fully. Emergent behavior in cross-service interactions is difficult to find ahead of time, and may even be difficult to identify when it is occurring

Polyglot Environment

(아마도 백엔드) 서비스 내에서 C++이 가장 보편적으로 사용되는 언어인 것은 매우 신기함.

While C++ is the predominant language for standalone services at Facebook, Python, Java, Haskell, D, Ruby and Go are also supported by Thrift, our inter-service serialization format. The application code coordinating service invocations is generally written in Hack and PHP, and final composition of query
results may be performed by JavaScript in the browser, Objective C/C++ on IOS, or Java on Android.

여러 언어를 사용하고 있기 때문에 인터페이스나 쓰레딩 모델 등에 대해 가정하는 것이 어렵다는 이야기.

Ports or foreign function interfaces must be provided to give all of these codebases access to an end-to-end
consistency platform. Perhaps the trickiest part of this multi-language support is the wide variety of threading models that are idiomatic for different languages and runtimes, because it is likely the underlying consistency system will need to initiate or delay communication.

Varying Deployment Schedules

낮은 레벨의 서비스들은 매우 보수적인 deployment 프로세스를 가지고 있으므로, 이에 관련한 시스템을 빠르게 개발하는 것은 쉽지 않을거라는 이야기.

Facebook has an extremely aggressive deployment process for the stateless application logic, but we are necessarily more conservative with stateful and mature low level services. An inter-service coordination mechanism that is used by or interposes on these low-level services will have a deployment velocity that is constrained by the release engineering requirements of the most conservative component.

Reduced Incremental Benefit in a Mature System

consistency에 관해 이미 적용된 workaround들이 있으므로, 이를 일반적으로 개선하는 시스템의 이득이 떨어진다는 이야기.

While these workarounds would not exist in a newly built system, their presence in Facebook reduces the incremental benefits of deploying a generic system for stronger consistency guarantees.

Related Work

Prior Facebook Publications

  • memcache cache
    • B. Atikoglu, et al. Workload analysis of a large-scale keyvalue store. In SIGMETRICS, 2012.
    • R. Nishtala, et al. Scaling memcache at facebook. In NSDI, 2013.
  • TAO graph store
    • N. Bronson, et al. Tao: Facebook’s distributed data store for the social graph. In USENIX ATC, 2013.
  • Wormhole pub-sub system
    • Y. Sharma, et al. Wormhole: Reliable pub-sub to support geo-replicated internet services. In NSDI, May 2015
  • a characterization of load imbalance in the caches
    • Q. Huang, et al. Characterizing load imbalance in real-world networked caches. In HotNets, 2014.
  • an analysis of the messages use case
    • T. Harter, et al. Analysis of hdfs under hbase: A facebook messages case study. In FAST, 2014.
  • understanding and improving photo/BLOB storage and delivery
    • D. Beaver, et al. Finding a needle in haystack: Facebook’s photo storage. In OSDI, 2010.
      Q. Huang, et al. An Analysis of Facebook Photo Caching. In SOSP. ACM, 2013.
    • S. Muralidhar, et al. f4: Facebook’s warm blob storage system. In OSDI, 2014.
    • L. Tang, et al. RIPQ: Advanced Photo Caching on Flash For Facebook. In FAST, Feb. 2015.

Systems with Stronger Semantics

  • replicated state machines
    • L. Lamport. Time, clocks, and the ordering of events in a distributed system. Comm. ACM, 21(7), 1978.
    • F. B. Schneider. Implementing fault-tolerant services using the state machine approach: a tutorial. ACM Computer Surveys, 22(4), Dec. 1990.
  • causal and atomic broadcasts
    • K. P. Birman and R. V. Renesse. Reliable Distributed Computing with the ISIS Toolkit. IEEE Comp. Soc. Press, 1994.
  • systems designed for high availability and stronger consistency
    • K. Petersen, et al. Flexible update propagation for weakly consistent replication. In SOSP, Oct. 1997.
  • scalable causal consistency for datacenter-scale and/or geo-replicated services
    • S. Almeida, et al. Chainreaction: a causal+ consistent datastore based on chain replication. In EuroSys, 2013.
    • P. Bailis, et al. Bolton causal consistency. In SIGMOD, 2013.
    • J. Du, et al. Orbe: Scalable causal consistency using dependency matrices and physical clocks. In SOCC, 2013.
    • J. Du, et al. Gentlerain: Cheap and scalable causal consistency with physical clocks. In SOCC, 2014.
    • W. Lloyd, et al. Don’t settle for eventual: Scalable causal consistency for wide-area storage with COPS. In SOSP, Oct. 2011.
    • W. Lloyd,  et al. Stronger semantics for low-latency geo-replicated storage. In NSDI, Apr. 2013.
  • strong consistency for datacenter-scale and/or geo-replicated services
    • J. C. Corbett, et al. Spanner: Google’s globally distributed database. In OSDI, Oct. 2012.
    • R. Escriva, et al. HyperKV: A distributed, searchable key-value store for cloud computing. In SIGCOMM, 2012.
    • L. Glendenning, et al. Scalable consistency in Scatter. In SOSP, Oct. 2011.
    • C. Li, D. Porto, A. Clement, J. Gehrke, N. Preguic¸a, and R. Rodrigues. Making geo-replicated systems fast as possible, consistent when necessary. In OSDI, Oct. 2012.
    • J. Ousterhout, P. Agrawal, D. Erickson, C. Kozyrakis, J. Leverich, D. Mazi`eres, S. Mitra, A. Narayanan,
      M. Rosenblum, S. M. Rumble, E. Stratmann, and R. Stutsman. The case for ramcloud. ACM SIGOPS Operating Systems Review, 43(4), 2010.
    • D. B. Terry, V. Prabhakaran, R. Kotla, M. Balakrishnan, M. K. Aguilera, and H. Abu-Libdeh. Consistency-based service level agreements for cloud storage. In SOSP, 2013.
  • transactions for datacenter-scale and/or geo-replicated services
    • P. Bailis, et al. Scalable atomic visibility with ramp transactions. In SIGMOD, 2014.
    • J. Baker, et al. Megastore: Providing scalable, highly available storage for interactive services. In CIDR, Jan. 2011.
    • J. C. Corbett, et al. Spanner: Google’s globally distributed database. In OSDI, Oct. 2012.
    • T. Kraska, et al. Mdcc: Multi-data center consistency. In EuroSys, 2013.
    • W. Lloyd, et al. Stronger semantics for low-latency geo-replicated storage. In NSDI, Apr. 2013.
    • S. Mu, et al. Extracting more concurrency from distributed transactions. In OSDI, 2014.
    • Y. Sovran, et al. Transactional storage for geo-replicated systems. In SOSP, Oct. 2011.
    • A. Thomson, et al. Calvin: fast distributed transactions for partitioned database systems. In SIGMOD, May 2012.
    • C. Xie, et al. Salt: combining acid and base in a distributed database. In OSDI, 2014.
    • Y. Zhang, et al. Transaction chains: achieving serializability with low latency in geo-distributed storage systems. In SOSP, pages 276–291. ACM, 2013.

Discussions of Challenges to Stronger Consistency

  • criticism of enforcing an order in a communication substrate instead of end to end
    • D. R. Cheriton, et al. Understanding the limitations of causally and totally ordered communication. In
      SOSP, Dec. 1993.
  • slowdown cascades to motivate enforcing only an explicit subset of causal consistency
    • P. Bailis, et al. The potential dangers of causal consistency and an explicit solution. In SOCC, 2012.
  • detail the importance and challenges for tail latency in high-scale services at Google
    • J. Dean, et al. The tail at scale. Comm. ACM, 56(2), 2013.

My Opinion

Strongly consistent system의 연구자들에게 방향을 제시하기 위해 Facebook의 시스템이라는 도메인 내에서 발생하는 숙제들을 정의하고 있는 논문이라고 볼 수 있는데, 마치 Facebook에서 strongly consistent system을 일반적으로 도입해서는 안되는 이유들을 정리해놓은 듯한 느낌이 들었다. 오히려 나의 관심은 Facebook에서 데이터의 복제에 관한 시스템과 복제본 사이의 consistency 문제를 어떻게 다루고 있는가 였는데, 우리가 고심하고 있는 문제들이나 해결 방향이 상당히 닮아 있다는 인상을 받았다. 여러 서비스에 걸친 업데이트 통지를 위해 인프라로서 pub-sub을 적극적으로 활용하고 있는 점이나, 서비스별로 소셜 그래프의 별도 인덱스나 캐시 등을 잘 구축해서 활용하고 있는 점은 잘하고 있다고 생각했다. Facebook의 graph storage인 TAO, pub-sub 시스템인 Wormhole 등에 대해 관심이 생겼다.

Paper: Dotted version vectors: Logical clocks for optimistic replication (Part 1)

PREGUIÇA, Nuno, et al. Dotted version vectors: Logical clocks for optimistic replication. arXiv preprint arXiv:1011.5808, 2010.

Dynamo, Cassandra, Riak, Voldemort와 같은 시스템들은 쓰기 가용성 (write availability)을 보장하기 위해, 어떤 하나의 데이터 항목의 여러 복제본이 동일한 데이터 항목의 복제본이 서로 다른 값으로 갈라질 (diverge) 수 있고, 이를 나중에 수리(repair)하는 방법을 고안하고 있다. 이 때, 복제본 버전들을 비교해서 어느 한쪽이 새로운 업데이트라서 다른 복제본들을 교체할 수 있는지, 아니면 동시적 (concurrent)이라서 한지를 semantic reconciliation을 필요로하는 지를 결정할 수단을 필요로 한다. 이러한 비교는 업데이트 사이의 인과적인 의존성 (causal dependency)에 따라 판단하는데, 인과적인 이력 (Causal History)을 모두 기록하는 것은 비효율적이므로, 이 정보를 요약하는 동시에 인과적인 의존성을 따질 수 있는 수단으로 람포트 시계(Lamport Clock)나 버전 벡터(Version Vector) 등의 개념들이 제안되고 실제로 위와 같은 시스템들에서 활용되고 있다.

이 논문에서 도입하고 있는 Dotted Version Vector는 인과적인 이력을 요약하기 위한 이러한 수단들이 가진 정확성(Correctness)의 한계를 개선하고, 동시에 확장성(Scalability)을 가지는 해결 방법으로서 제시되고 있다.

우선 이번 글에서는 논문의 전반부에 소개된, 인과성의 추적을 위해 사용되는 여러 방법들과 한계들에 대해서 알아보도록 하자.

1. 인과적인 이력 (Causal Histories)

causal_histories

우리가 다루고자 하는 클라이언트-서버로 구성된 스토리지 시스템에서, 클라이언트가 서버로 어떤 작업을 요청할 때마다  인과적인 관계가 발생한다. 이러한 관계를 formal하게 표현하기 위한 방법 중 하나가 인과적인 이력 (Mattern, 1994)이다. 인과적인 이력은 업데이트라는 사건들에 대한 유일한 식별자(identifier)의 집합으로 표현할 수 있다. 이 때 업데이트 사건의 식별자는 노드(리플리카 또는 클라이언트)의 식별자와 단조증가하는 카운터를 합쳐서 만들 수 있다. 업데이트가 발생할 때마다 새로운 식별자가 식별자의 집합에 추가하게 되고, 이 집합의 포함관계를 비교해서 인과성의 부분순서(partial order)를 추적할 수 있다. 두 집합이 서로를 포함하고 있지 않다면 동시적이라고 할 수 있다.

인과적인 이력은 개념적으로는 단순하지만, 업데이트에 따라서 식별자의 집합이 선형적으로 커지게 되므로, 실용적인 시스템에 사용하는 것은 적절하지 않다.

2. 인과적으로 호환되는 완전 순서 (Causally compliant total order)

real_time_clock

업데이트들 사이의 인과적인 의존성과 호환되는 완전 순서를 정할 수 있다면, 이를 이용해 last writer wins 정책을 적용할 수 있다. 다른 방식들과 달리, 복제본 노드 (Replica Node)는 하나의 값만 유지하면 되고, 쓰기를 위해 읽기 문맥 (get context)에 해당하는 정보를 제공할 필요가 없기 때문에 매우 단순한 시스템을 얻을 수 있다는 장점이 있다. 문제는 이러한 완전 순서는 인과적인 의존성과 호환되지만, 동시적인 (concurrent) 업데이트들도 정렬해버리기 때문에, 어떤 동시적인 업데이트는 last writer wins 정책에 의해 잃어버리게 된다는 것이다.

2.1. 물리적인 시계

클라이언트들의 시계들이 잘 동기화된다면, 업데이트들을 물리적인 시계의 시간 순서에 따라 정렬함 (동시에 일어난 사건은 프로세스 ID를 이용해 정렬)으로써 완전 순서를 얻을 수 있다. 물리적인 시계에 기반한 완전 순서를 사용하는 방식은 Cassandra 0.6.x나 Dynamo에서 일부 애플리케이션에 대해 버전 벡터의 대안으로 사용되었다고 한다.

물리적인 시계에 기반할 경우의 문제점은 역시 클라이언트들의 시계들이 동기화에서 벗어날 때 발생한다. 시계가 느린 복제본 노드나 클라이언트는 항상 동시적인 업데이트 사이의 경쟁에서 지기 때문에, 항상 동시적인 업데이트를 잃어버리는 문제가 발생한다.

2.2. 램포트 시계 (Lamport Clock)

예전의 글에서 소개했듯이 램포트 시계는 역시 완전 순서를 위해 사용될 수 있는 논리적인 시계의 작동 방식을 제공하고 있다.

3. 서버별 항목을 가진 버전 벡터 (Version vectors with per-server entry)

version_vectors_with_per_server_entries

서버별 항목을 가진 버전 벡터를 이용해 인과성을 추적하는 방법은 다음과 같이 작동한다.

  1. 클라이언트가 GET을 실행할 때 값에 반영된 사건들의 인과적인 이력를 나타내는 버전 벡터를 받음.
  2. 그 클라이언트가 PUT을 실행할 때, 이전의 GET에서 받았던 버전 벡터를 함께 보냄.
  3. PUT을 실행하는 서버는 새로운 업데이트를 반영하기 위해 로컬 카운터를 증가시키고, 서버의 식별자에 해당하는 버전 벡터의 항목에 저장.
  4. 새로운 버전 벡터를 서버에 저장되어 있는 다른 버전 벡터와 비교하고, 낡은 버전들을 모두 버린다.

서로 다른 서버들 사이의 업데이트들 사이의 인과성을 추적하는 것이 가능하지만, 동일한 서버에서 발생한 업데이트들 사이의 인과성을 추적할 수 없다. 즉, 동일한 서버에서 발생한 동시적인 업데이트는 또 다시 last writer wins 정책이 적용되므로, 적어도 하나의 동시적인 업데이트는 잃어버릴 수 밖에 없다. Plausible Clocks에서 설명하듯이, 이러한 문제의 본질적인 원인은, 동시적인 업데이트를 발생시키는 근원에 해당하는 클라이언트의 수에 비해서 적은 수의 버전 벡터 항목을 사용하기 때문이다.

이러한 방식은 Dynamo가 사용하고 있다.

4. 클라이언트별 항목을 가진 버전 벡터 (Version vectors with per-client entry)

version_vectors_with_per_client_entries

서버별 항목을 가진 버전 벡터에서 살펴본 문제를 해결하기 위한 가장 자연스러운 접근 중 하나는 클라이언트별 항목을 가진 버전 벡터를 사용하는 것이다. 동작 방식은 다음과 같다.

  1. 클라이언트가 GET을 실행할 때 값에 반영된 사건들의 인과적인 이력을 나타내는 버전 벡터를 받음.
  2. 그 클라이언트가 PUT을 실행할 때, 이전의 GET에서 받았던 버전 벡터, 그리고 클라이언트의 식별자와 클라이언트별로 단조증가하는 카운터를 함께 보냄.

이 방식은 서로 다른 클라이언트들에 의해 발생한 동시적인 업데이트들 사이의 인과성을 완전히 추적할 수 있지만, 클라이언트 당 하나의 항목을 필요로 하기 때문에, 버전 벡터들의 크기가 클라이언트들의 수에 비례하게 되고, 이 방식을 실용적으로 사용할 수 없게 만드는 원인이 된다.

 

Kafka: a Distributed Messaging System for Log Processing

Kafka: A distributed messaging system for log processing In Proceedings of 6th International Workshop on Networking Meets Databases (NetDB), Athens, Greece (2011) by J. Kreps, N. Narkhede, J. Rao

Kafka는 기존의 메시징 시스템에서 당연하다고 가정하고 있었지만, 로그 처리 시스템에서는 필요없는 보장들을 과감하게 버리고, 성능 위주의 설계를 함으로써, 실제로 링크 속도에 육박하는 성능을 보여주고 있고, 상당히 단순한 아키텍쳐를 유지하고 있는 점이 흥미로운 점입니다. 그리고, 현실의 데이터 처리에 있어서는 버그나 장애로 인해 흔히 발생하는 재처리가 pull 기반의 소비 모델을 도입함으로써 매우 쉽게 가능해졌다는 점도 눈여겨 볼 부분입니다.

주의: 이 글은 2011에 출판된 Kafka의 페이퍼 내용을 다루고 있으며, 현재의 Kafka 버전의 내용을 다루고 있지 않습니다.

Problem

기존의 엔터프라이즈 메시징 시스템들의 한계를 다음과 같은 이유들로 설명하고 있습니다.

  • 배달 보장 (delivery guarantee)을 위한 기능들은 로그 처리를 위한 시스템 입장에서는 불필요..
  • 처리속도 (throughput)를 디자인 제약으로 고려하지 않음.
  • 분산에 대한 고려가 부족.
  • 메시지가 즉시 소비되는 것을 전제로 하기 때문에 메시지가 쌓일 경우 성능이 하락.

마찬가지로 최근에 만들어진 로그 처리 시스템들 – Scribe, Yahoo’s data highway project, Flume의 한계는 다음과 같은 이유들로 설명하고 있습니다.

  • 로그 데이터를 오프라인으로 처리하는 것을 위해 만들어진 시스템.
  • 대부분은 push 모델.

Kafka Architecture and Design Principles

Kafka의 주요 개념들과 개략적인 디자인은 다음과 같습니다.

kafka_architecture

  • 토픽 (topic): 특정한 타입의 메시지의 스트림
  • 프로듀서 (producer): 어떤 토픽에 대해 메시지를 발행 (publish)할 수 있음.
  • 프로듀서에 의해 발행된 메시지들은 브로커 (broker)라는 서버들에 저장됨. Kafka 클러스터는 일반적으로 여러 브로커들로 이루어짐.
  • 컨수머 (consumer)는 브로커로부터 1개 이상의 토픽을 구독할 수 있고, 브로커들로부터 데이터를 당김(pull)으로써 구독한 메시지들을 소비 (consume)할 수 있음.
    • point-to-point 배달 모델: 여러 컨수머가 하나의 토픽 내의 메시지를 하나씩 소비.
    • 발행/구독 (publish/subscribe) 모델: 여러 컨수머가 하나의 토픽의 각자의 복제본 메시지들을 소비.
  • 토픽은 여러 파티션들로 이루어져 있고, 각각의 브로커는 하나 이상의 파티션을 저장.

Efficiency on a Single Partition

Simple storage

  • 어떤 토픽의 각 파티션은 논리적인 로그에 해당.
  • 물리적으로 로그는 거의 동일한 크기의 세그먼트 파일들의 집합으로 구현됨.
  • 프로듀서가 어떤 파티션에 메시지를 발행할 때마다 브로커는 마지막 세그먼트 파일에 메시지를 append 한다.
  • 성능을 위해서 특정 개수의 메시지가 발행되거나 특정 시간이 흐른 후에 세그먼트 파일을 디스크로 flush한다.
    • 메시지는 flush가 된 후에 컨수머에게 노출됨.
  • 전형적인 메시징 시스템과 달리, 카프카의 메시지에는 메시지 식별자가 없고 로그 상의 논리적인 오프셋 (offset)을 주소로 사용.
    • 메시지 식별자로부터 메시지를 찾기 위한 인덱스 구조를 유지할 필요가 없어짐.
    • Kafka의 메시지 식별자 – 로그 상의 논리적인 오프셋은 자연히 증가하지만, 연속적이지는 않음.
  • 컨수머는 항상 특정한 파티션으로부터 연속적으로 메시지를 소비함.
  • 컨수머가 특정한 메시지 오프셋에 대해 ack을 한다면, 이는 그 파티션의 해당 오프셋 이전의 모든 메시지를 받았음을 의미.
  • 컨수머는 브로커에게 비동기적인 당김 (pull) 요청을 보낸다. 각 요청은 소비할 메시지의 오프셋과 최대로 가져올 메시지의 크기를 포함.
    • 브로커는 모든 세그먼트 파일의 첫번째 메시지의 오프셋들의 목록을 메모리 상에 유지.
    • 브로커는 그 목록을 세그먼트 파일을 찾고, 컨수머에게 데이터를 보낸다.
    • 컨수머가 메시지를 받은 후에는 다음 메시지의 오프셋을 계산해서 다음 요청에서 사용.

kafka_log

Efficient transfer

  • 프로듀서는 한번의 send 요청에 여러 메시지를 보낼 수 있음.
  • 컨수머 API는 한번에 하나의 메시지를 소비하는 것처럼 보이지만, 내부적으로는 각 당김 요청은 특정 사이즈 (보통 수백 KB) 내에 해당하는 여러 메시지를 가지고 옵니다.
  • 메시지들을 직접 메모리 상에 캐싱을 하지 않고 파일 시스템 페이지 캐시에 의존함.
    • 중복된 버퍼링을 피함.
    • 프로세스 내에서 메시지를 캐시하지 않기 때문에 가비지 컬렉션의 오버헤드가 매우 적어짐.
    • 프로듀서와 컨수머는 세그먼트 파일을 순차적으로 액세스하고, 보통 컨수머는 프로듀서보다 살짝 뒤쳐지기 때문에, 보통의 OS 캐싱 휴리스틱 (write through, read-ahead)이 매우 효과적으로 동작함.
  • sendfile API를 이용해서 복사와 시스템 콜 회수를 줄임.

Stateless broker

  • 대부분의 다른 메시징 시스템과 다르게, 각각의 컨수머가 얼마나 소비했는지는 브로커가 유지하지 않음.
    • 이러한 디자인은 브로커의 복잡도와 오버헤드를 줄임.
  • 문제는 이미 소비한 메시지를 언제 지울지 결정하는 것이 어려워짐.
    • Kafka는 시간 기준의 SLA를 사용해서 해결하는데, 특정 기간 (보통 1주일) 이상이 지나면 메시지는 자동적으로 삭제됨.
      • 대부분의 컨수머는 비교적 짧은 시간 단위 – 매일, 시간별, 실시간 – 내에 소비를 마치기 때문에 문제가 없음.
      • 카프카는 데이터가 크기에 따라서 성능이 저하되지 않으므로 이러한 방법이 가능.
  • 부가적인 이점으로, 컨수머는 이전의 오프셋으로 되돌아가 메시지를 다시 소비하는 것이 가능해짐.
    • 예를 들어, 컨수머의 로직에 에러가 있을 경우, 에러가 수정된 후에 문제가 된 메시지들을 다시 처리하는 것이 가능.
    • 컨수머가 크래시한다면 flush되지 않은 데이터는 잃어버리겠지만, flush하지 않은 가장 작은 오프셋부터 메시지를 다시 소비하는 것이 가능.
    • 이러한 메커니즘은 push 모델보다 pull 모델에서 훨씬 쉬움.

3.2 Distributed Coordination

  • 각각의 프로듀서는 랜덤하게 선택된 파티션이나 파티셔닝 키와 파티셔닝 함수에 의해 결정된 파티션으로 메시지를 발행할 수 있다.
  • 컨수머 그룹 (consumer group)이라는 개념
    • 토픽의 집합을 consume하는 하나 이상의 컨수머들로 구성.
    • 서로 다른 컨수머 그룹은 독립적으로 구독한 메시지들의 전체 집합을 consume하고, 컨수머 그룹 끼리는 조정?(coordination)이 불필요.
  • 어떤 토픽 내에서 하나의 파티션은 병행성의 최소 단위
    • 주어진 시점에 하나의 파티션으로부터의 모든 메시지는 각 컨수머 그룹 내에서 하나의 컨수머에 의해서만 소비됨.
    • 하나의 파티션을 여러 컨수머가 동시에 소비하도록 했다면 누가 어떤 메시지를 소비할지는 조정해야하는 오버헤드가 발생.
    • 로드를 균형 있게 맞추기 위해서는 파티션의 수는 컨수머의 수보다 더 많아지도록 유지.
  • 시스템을 단순화하기 위해 중앙의 마스터 노드를 가지지 않도록 함.
    • 조정을 위해서 Zookeeper를 사용.
    • Kafka가 ZooKeeper를 사용하는 작업들
      • 브로커와 컨수머의 추가와 제거를 탐지
      • 브로커나 컨수머가 추가/제거될 경우 각 컨수머에게 리밸런스 프로세스를 트리거.
      • 소비 관계를 유지하고, 각 파티션의 소비된 오프셋을 추적
        • 브로커나 컨수머가 시작하면 주키퍼의 브로커 또는 컨수머 레지스트리에 그 정보를 저장.
        • 브로커 레지스트리는 브로커의 호스트 이름과 포트, 토픽과 파티션의 집합을 포함.
        • 컨수머 레지스트리는 컨수머가 속한 컨수머 그룹과, 구독하고 있는 토픽들의 집합을 포함.
        • 각 컨수머 그룹은 소유권 레지스트리와 오프셋 레지스트리와 연관됨.
        • 소유권 레지스트리는 모든 구독된 파티션에 대해 하나의 ZooKeeper 경로를 가지며, 그 값은 현재 그 파티션을 소비하고 있는 컨수머의 ID.
        • 오프셋 레지스트리는 각각의 구독된 파티션에 대해 파티션 내에서 마지막으로 소비된 오프셋을 저장.
      • 브로커 레지스트리, 컨수머 레지스트리, 소유권 레지스트리에 대한 주키퍼 경로들은 ephemeral 경로.
      • 오프셋 레지스트리에 대해서는 persistent 경로.
      • 브로커가 실패하면 그 위의 모든 파티션은 자동적으로 브로커 레지스트리에서 삭제됨.
      • 컨수머의 실패는 컨수머 레지스트리에서 컨수머에 해당하는 항목과 소유권 레지스트리에서 그것이 가진 모든 파티션을 삭제.
    • 각 컨수머는 브로커와 컨수머 레지스트리에 대해 ZooKeeper Watcher를 등록하고 브로커나 컨수머 그룹에서 변화가 일어나면 통지를 받음.
    • 컨수머의 시작이나 컨수머가 브로커/컨수머 변화에 대해 통지를 받으면, 컨수머는 소비할 파티션의 부분집합을 정하기 위해 리밸런스 프로세스를 시작.
      • 브로커와 컨수머 레지스트리를 읽어서 각 구독된 토픽에 대해 사용가용한 파티션의 집합과 그 토픽을 구독하고 있는 컨수머의 집합을 계산.
      • 파티션 집합을 범위로 파티셔닝(range-partition)해서 컨수머의 수 만큼의 청크로 나누고 소유할 청크를 고름.
      • 컨수머가 고른 각 파티션에 대해 소유권 레지스트리에 그 파티션의 새로운 소유자로 스스로를 써넣음.
      • 컨수머는 각 소유한 파티션으로부터 데이터를 당겨오는 쓰레드를 시작. 오프셋 레지스트리에 저장된 오프셋으로부터 읽기 시작.
      • 컨수머는 주기적으로 오프셋 레지스트리에 마지막으로 소비한 오프셋을 업데이트.
    • 그룹 내에 여러 컨수머가 있을 때 각각은 브로커/컨수머 변경에 대해 통지를 받지만, 그 통지가 컨수머마다 살짝 다르게 올 수 있음.
      • 하나의 컨수머가 다른 컨수머에 의해 아직 소유된 파티션에 대한 소유권을 가지려고 하는 것이 가능.
      • 이 경우, 첫번째 컨수머는 단순히 모든 파티션을 릴리즈하고 조금 기다린 후 리밸런스 프로세스를 재시도한다.
      • 리밸런스 프로세스는 단 몇번의 재시도 만에 안정화됨.
    • 새로운 컨수머 그룹이 만들어졌을 때는 오프셋 레지스트리에는 오프셋이 없지만, API를 사용해서 최소 또는 최대의 오프셋부터 시작할 수 있음.

Delivery Guarantees

  • Kafka는 at-least-once 배달만을 보장함.
    • exactly-once 배달은 보통 2PC를 필요로 하고, 카프카가 대상으로 하는 애플리케이션은 필요로 하지 않음.
    • 보통의 경우, 메시지는 각 컨수머 그룹에 정확히 한번만 배달됨. 하지만, 컨수머 프로세스가 정상적으로 종료되지 않고 죽은 경우, 그 컨수머가 소유하고 있던 파티션을 새롭게 소유하게된 컨수머는 ZooKeeper에 커밋된 마지막 오프셋 후의 메시지들에 대해 중복 메시지를 소비하게 됨.
    • 애플리케이션에 있어서 메시지의 중복이 중요한 문제라면, Kafka가 컨수머에게 돌려주는 오프셋을 이용하거나 메시지 내의 어떤 유일하게 식별 가능한 키를 사용해서 자체적인 중복 방지 로직을 추가해야함.
  • Kafka는 하나의 파티션으로부터의 메시지는 컨수머로 차레대로 전달되는 것을 보장. 하지만 서로 다른 파티션으로부터 오는 메시지의 순서는 보장하지 않음.
  • 로그가 망가지는 것을 피하기 위해 각 메시지에 대해 CRC를 저장.
    • Kafka는 CC를 가진 메시지를 삭제하는 리커버리 프로세스를 실행.
    • 이 CRC는 네트워크 에러를 체크할 수도 있게 해줌.
  • 브로커가 죽으면 그것 내에 저장된 소비되지 않은 메시지는 사용불가능해짐.
    • 미래에는 여러 브로커에 메시지를 중첩해서 저장하는 자체적인 복제를  추가할 예정.

Experimental Results

Producer test

producer_performance

Kafka의 Producer 성능이 좋은 이유를 아래와 같은 이유로 설명하고 있습니다.

  • Kafka 프로듀서는 브로커로부터의 ack을 기다리지 않고 브로커가 다룰 수 있는 만큼 빠르게 메시지를 보냄.
    • 배치 크기가  50일 때는 Kafka 프로듀서는 프로듀서-브로커 사이의 1GB 링크에 포화됨. (saturated)
    • ack이 없으므로 발행된 모든 메시지가 실제로 브로커에 의해 받아진 건지 보장할 수 없음.
    • 로그 데이터에 대해서는 영속성을 처리속도와 트레이드를 하는 것이 좋음.
    • 미래에 영속성이 중요한 데이터에 대한 개선을 계획 중임.
  • 효율적인 스토리지 형식
    • 평균적으로 Kafka에서는 각 메시지는 9바이트의 오버헤드를 가짐. 반면, ActiveMQ에서는 144 바이트.
    • ActiveMQ에서의 오버헤드는 JMS가 요구하는 무거운 메시지 헤더에서 옴.
    • 여러 인덱싱 구조를 유지하기 위한 비용도 존재.
  • 배치 방식은 RPC의 오버헤드를 감추어서 (amortize) 처리속도를 크게 개선함.

 

Consumer Test

consumer_performance

Kafka가 좋은 성능을 보여주는 이유를 다음과 같이 설명하고 있습니다.

  • 효율적인 스토리지 포맷
  • ActiveMQ, RabbitMQ는 모든 메시지의 배달 상태를 유지하기 위해 꾸준히 쓰기가 발생하지만, Kafka의 브로커는 소비를 위한 디스크 쓰기가 발생하지 않음.
  • sendfile API의 사용.

Future Works

Kafka가 미래에 지원할 기능으로 우선 영속성을 위한 자체적인 복제 지원을 들고 있습니다. 비동기-동기 복제를 둘다 지원하려고 하고, 애플리케이션의 여러가지 요구에 따라 적절한 중첩 레벨을 선택할 수 있도록 하려 한다고 합니다. 다른 하나는 스트림 프로세싱입니다. 기본적인 레코드 수를 센다든지, 다른 스트림이나 다른 스토리지의 데이터와 join하는 등의 처리가 가능하게 하려고 합니다. 우리는 현재의 Kafka는 스트림 프로세싱을 지원하지 않고, Kafka와 협동하는 다른 프레임워크에 위임하는 방식으로 이루어지고 있다는 것을 알고 있습니다.

Finding a needle in Haystack: Facebook's photo storage

Finding a needle in Haystack: Facebook’s photo storage by Doug Beaver, Sanjeev Kumar, Harry C. Li , Jason Sobel , Peter Vajgel , Facebook Inc, 2010.

최근 몇개월 동안  상대적으로 크기가 작고 많은 파일들을 비교적 단순한 솔루션을 이용해서 효율적으로 저장하는 것에 대해서 관심이 있어서 읽어본 페이퍼이다. 파일들의 개수로 인한 메모리 사용량과 복제 시간 등이 골칫거리라서 커다란 파일에 저장하는 방법을 여러가지로 검토하고 있었는데, 매우 실용적이고 깔끔한 접근으로 문제를 해결하고 있고, 실제로 개발 비용도 적게 (몇개월 정도 걸렸다고 한다) 들었으므로, 2009년 경의 시스템이지만, 서비스를 다루는 엔지니어로서는 배울 점이 있다고 생각한다.  이미지 파일 자체가 불변 데이터이므로 상대적으로 쉬운 문제인 것은 맞지만, 실제로 서비스에 적용하는 것은 또 다른 문제이다. 오픈소스화 되어 있지 않은 것이 아쉬운 점이라고 할 수 있는데 – 실리콘 밸리의 큰 회사들 치고는 의외로 Facebook은 오픈소스에 그렇게 열려있지 않은 문화인 것 같다 –  구현이 비교적 단순하기 때문인지 수많은 Haystack Clone들이 있다. 혹시 좋은 구현을 찾게되면 소개할 기회가 있을지도 모르겠다.

Problem – NFS Approach

Facebook은 NAS와 NFS를 이용하는 사진 스토리지를 구축하고 있었으며 다음과 같은 문제를 인식하고 있었다.

  • 사진을 저장할 때 전통적인 POSIX 파일시스템에서 요구하는 파일 메타데이터는 필요하지 않음.
  • 사진 파일을 읽기 위해서 3번의 디스크 오퍼레이션이 필요하고 메타데이터의 액세스가 병목이 됨.
    • 디렉토리의 blockmap이 너무 커서 효과적으로 캐시되지 않음.
  • Hot한 액세스는 CDN으로 다룰 수 있지만, long tail 성격을 가진 액세스가 있으므로, 캐싱만으로 해결할 수 없음.
  • 대체 기술들도 적합하지 않음.
    • MySQL, Hadoop
    • RAM을 늘리는 것만으로는 비효율적.

Design & Implementation

Haystack은 다음과 같은 목표를 달성해서 NFS approach의 bottleneck을 줄인다.

  • 실제 데이터를 읽기 위한 디스크 오퍼레이션을 단 1번만 필요하도록 한다.
  • 메타데이터를 위한 메모리 사용량을 줄임.

기본적인 접근 방식은 여러 사진을 하나의 커다란 파일에 저장하는 방식을 사용하는 것이다.

전체적인 시스템 구성은 Haystack Store, Haystack Directory, Haystack Cache의 3개의 서브시스템으로 이루어져 있다.

  • Store
    • Persistent storage로 파일시스템 메타데이터를 관리하는 컴포넌트.
    • Logical volume은 서로 다른 장비의 Physical volume들로 구성됨.
  • Directory
    • Logical Volume으로부터 Physical volume으로의 맵핑.
    • 각 사진들이 있는 Logical volume이나 여유공간이 있는 Logical volume을 관리한다.
  • Cache
    • 내부적인 CDN과 같은 역할.

웹서버는 디렉토리를 이용해서 URL을 구성하는데, 특이한 것은 사진의 URL에 Logical volume이나 장비에 대한 힌트 등 사진을 가져오기 위한 정보가 들어가 있다는 점이다.

  • URL은 CDN host/Cache/Machine ID/Logical volume, Photo와 같은 형태인데, CDN 또는 Haystack Cache는 (Logical volume, Photo) 정보만을 이용해서 자신의 캐시로부터 사진을 찾는다.
  • CDN이나 Cache로부터 miss일 경우에는 Cache address를 URL로부터 제거하고 Store 장비에 사진을 요청한다.

Haystack Directory

Haystack Directory는 다음과 같은 4가지 기능을 가지고 있다.

  • Logical volume을 Physical volume으로 맵핑
    • 어떤 Store 장비가 소실된 상황에서는 mapping에 상응하는 엔트리를 삭제하고 온라인이 된 새로운 머신으로 대체.
  • 쓰기 액세스를 여러 로지컬 볼륨에 대해 로드 밸런싱, 읽기 액세스를 여러 피지컬 볼륨에 대해 로드밸런싱
  • 리퀘스트가 CDN으로 핸들링되어야할지 Cache로 핸들링되어야할지 결정
  • 로지컬 볼륨이 읽기전용인지 아닌지에 대한 판단. (운영적인 이유나 용량 한계)
    • 새로운 장비를 추가하면 이 장비는 쓰기가 허용되고, 쓰기가 허용된 장비만 사진의 업로드를 받는다.

Haystack Directory는 복제가 되는 데이터베이스에 정보들을 저장하고 PHP interface로 액세스되며, latency를 줄이기 위해 memcache를 사용한다고 한다.

Haystack Cache

Haystack Cache는 Photo ID를 키로 데이터를 찾을 수 있는 Distributed hash table로 구성되어 있다. 어떤 DHT를 사용하고 있는지에 대해서는 자세히 설명되어 있지 않다. 캐시되어있지 않다면, URL에 명시되어 있는 Store 장비에서 사진을 가져온다. 캐시하는 조건이 조금 특이한데 다음과 같다.

  • 사용자로부터 직접 온 리퀘스트이고 CDN이 아닐 때: CDN에서 miss된 것이 내부 캐시에서 hit될 가능성이 낮음.
  • 쓰기가 허용된 Store 장비로부터 사진을 가져왔을 때: 사진은 업로드된 뒤에 헤비하게 액세스되므로 쓰기가 허용된 장비를 과도한 읽기 액세스로부터 보호.

Haystack Store

  • 전반적인 구조
    • 각 Store 장비는 여러 개의 Physical volume을 관리하고, 각 volume은 수백만 개의 사진을 저장한다.
      • 이를테면, Physical volume은 100GB이고, /hay/haystack…<logical volume id>와 같이 저장됨.
    • Store 장비는 Logical volume의 ID와 file offset을 이용해서 사진을 액세스할 수 있다.
    • Store 장비는 각 Physical volume에 대해 file descriptor를 유지한다.
    • Physical volume은 수퍼블럭과 needle의 sequence로 이루어져 있음.
    • needle을 빠르게 가져오기 위해서 각 볼륨에 대해 In-memory data structure를 유지하고 있음.
      • (key, alternative key) -> (needle’s flags, size in bytes, volume offset)
      • alternative key는 4개의 서로 다른 크기의 사진을 나타냄.
  • 읽기 (Photo Read)
    • 읽기 요청에는 Logical volume ID, Key, Alternative key, Cookie가 있다.
    • 사진이 업로드될 때 cookie는 랜덤하게 생성되어 Directory에 저장된다. 사진의 URL을 추측해서 액세스하는 공격을 막아준다.
    • In-memory 맵핑에서 관련된 메타데이터를 찾고, 지워진 사진이 아니라면 적절한 offset을 seek해서 전체 needle을 읽어들임.
    • 읽어들인 needle로부터 cookie를 검증하고 데이터의 integrity를 체크 후 문제가 없다면 응답.
  • 쓰기 (Photo Write)
    • 쓰기 요청에는 Logical volume id, Key, Alternate key, Cookie, Data가 있다.
    • Physical volume file에 needle image를 동기적으로 append하고, In-memory 맵핑을 업데이트 함.
    • 이미 존재하는 사진의 업데이트는 동일한 key/alternate key로 업데이트된 needle을 append하는 것으로 이루어진다.
      • 새로운 needle이 다른 Logical volume에 쓰여진다면, Directory는 애플리케이션 메타데이터를 업데이트하므로 미래의 읽기 요청은 기존 버전을 읽지 않음.
      • 새로운 needle이 같은 Logical volume에 쓰여진다면, 새로운 needle을 append함.
      • 업데이트로 인해 중복된 needle이 발생할 수 있는데, 하나의 Physical volume에서 높은 offset의 needle이 최신 버전의 needle이다.
  • 지우기 (Photo Delete)
    • In-memory 맵핑과 Volume file에 삭제 플래그를 설정.
    • 삭제된 사진에 대한 읽기 요청은 먼저 In-memory 맵핑의 플래그를 체크하고, 삭제 플래그가 설정되어 있다면 에러.
  • 인덱스 파일
    • 이론적으로는 모든 Physical volume을 읽어서 In-memory 맵핑을 복구할 수 있지만, 꽤 시간이 걸리는 일이므로, 인덱스 파일은 In-memory 맵핑을 빨리 빌드하게 도와주어 Store의 재기동 시간을 줄여줌.
    • 각각의 Physical volume에 대해 인덱스 파일을 유지하는데, In-memory 데이터 구조의 체크포인트라고 생각할 수 있다.
    • 인덱스 파일의 구조는 Physical volume file과 유사하게 수퍼블럭과 인덱스 레코드들의 시퀸스인데, Physical volume file에 나타나는 needle과 동일한 순서임.
    • 사진을 추가할 때 needle은 Physical volume file에 동기적으로 쓰지만, 인덱스 레코드는 비동기적으로 쓴다.
    • 사진을 삭제할 때 Physical volume file의 needle에는 삭제 플래그를 설정하지만, 인덱스 파일은 업데이트하지 않는다.
      • 추가적인 동기적인 디스크 쓰기를 피하고, 쓰기나 삭제가 좀 더 빠르게 끝나도록 해준다.
    • 인덱스 레코드가 없는 needle (orphan)이 발생할 수 있음.
      • 재기동을 할 때 orphan에 해당하는 인덱스 레코드의 추가 작업을 수행.
      • 인덱스의 마지막 레코드는 볼륨 파일의 마지막 orphan이 아닌 needle임.
      • 재기동이 완료되면 인덱스 파일만을 사용해서 In-memory 맵핑을 초기화.
    • 삭제된 사진이지만 인덱스 레코드는 이를 반영하지 않는 경우가 발생할 수 있음.
      • 삭제된 사진을 Physical volume으로부터 가져오더라도 전체 needle을 읽으면 삭제 플래그를 검사할 수 있으므로, 그 때 In-memory 맵핑을 업데이트하고 캐시에 오브젝트가 없다고 응답.
  • 파일시스템
    • 현재는 Extent 기반의 파일시스템인 XFS를 사용.
      • 커다란 파일의 블록맵이 메인메모리에 저장되기 충분할 정도로 작다.
      • 효율적인 파일 preallocation을 제공하므로, fragmentation을 완화하고 블록맵이 커지는 것을 막아준다.
    • XFS를 사용해서 파일 시스템 메타데이터를 읽기 위한 디스크 오퍼레이션을 제거할 수 있다.
    • 예외적으로는 사진 데이터가 extent나 RAID stripe 가장자리에 걸쳐져있는 경우 1번 이상의 디스크 오퍼레이션을 필요로 한다.
      • Haystack은 1GB extent를 preallocation하고 256KB RAID stripe size를 사용하므로 실제로는 그런 경우는 드물다.

3.5. Recovery from failures

  • Detection
    • 적극적으로 문제가 있는 Store 장비를 찾기 위해서, 피치포크(pitchfork)라는 주기적으로 Store 장비의 상태를 체크하는 백그라운드 작업을 실행한다.
    • 각 Store 장비로의 접속을 원격에서 체크하고, Volume file의 가용성과 실제로 데이터를 읽는 것이 가능한지 체크한다.
    • 어떤 Store 장비가 일관되게 체크에 실패한다면  피치포크는 자동적으로 그 Store 장비에 있는 모든 Logical volume을 read-only로 표시한다.
  • Repair
    • 한달에 몇 번 정도 전체 동기화 (bulk sync)를 필요로 하는 상황이 있는데, Store 장비의 데이터를 복제본의 Volume file을 이용해 리셋해야하는 경우다.
    • 전체 동기화되는 데이터의 양은 각 Store 장비의 NIC 속도보다 order of magnitude로 크기 때문에 복구까지 몇시간이 걸릴 수 있다.
      • (나의 계산: 1Gbit하에서 10MB/s (10%)를 쓸 수 있는 상황이라면 10분 내에 복구하려면 10MB x 600sec = 6TB 정도. 1MB x 600 sec이라면 600GB.)

3.6. Optimizations

3.6.1 Compaction

  • 삭제되거나 업데이트에 의해 중복된 needle에 의해 사용되는 공간을 되찾기 위한 과정으로, 중복되거나 삭제된 엔트리를 skip하면서 needle을 새로운 파일로 복사함으로써 Volume file을 compaction한다.
  • Compaction 과정 중에 발생하는 삭제는 원래의 Volume file과 새로 빌드하는 Volume file 양쪽으로 적용된다.
  • 이 과정이 파일의 끝까지 진행되면 추가적인 수정을 블록하고 파일과 In-memory 맵핑을 atomic하게 swap한다.

3.6.2 Saving more memory

  • 플래그는 삭제 용도로만 사용하고 있기 때문에, In-memory record의 offset을 0으로 설정함으로써 삭제를 표현하고 플래그 필드는 제거.
  • 메인 메모리에는 쿠키를 저장하지 않고, 디스크로부터 니들을 읽은 후에만 쿠키를 체크한다.
  • 현재는 사진 당 10 bytes의 메모리를 사용함.
    • 하나의 이미지에 대해 4개의 사이즈를 유지하므로, 키 (8 bytes) + alternate key (4 byte) x 4 + data sizes (2 bytes) x 4 = 32 bytes
    • 해시 테이블에 의해 이미지당 2 bytes의 오버헤드.
    • 리눅스의 xfs_inode_t는 536 bytes임.

3.6.3 Batch upload

  • 디스크는 커다란 sequential write일 경우 성능이 좋으므로, 가급적이면 batch upload를 하려고 노력함.
  • 사용자의 앨범 업로드

4. Evaluations

4.1. Characterizing photo requests

  • 사용자는 매일 수백만개의 사진을 업로드함.
  • 옛날 것보다 최근에 업로드된 사진이 더 POPULAR함.

4.1.1. Features that drive photo requests

News Feed와 album이 98%의 Facebook request임.

  • 빠른 popularity 감소는 CDN/Cache가 popular content의 호스팅에 매우 효율적일 것임
  • Longtail 그래프는 무시할 수 없는 수의 리퀘스트가 캐시로 다룰 수 없음을 의미

4.1.2 Traffic Volume

  • Daily Uploaded: ~120 M
  • Haystack Photo WrittenL ~1.44 B (12배 = 4 sizes, 3 locations)
  • Photo Viewed: 80-100 B
  • Haystack Photo Read: 10 B

4.2. Haystack Directory

Hashing policy는 read-write를 잘 분산하고 있음.

4.3. Haystack Cache

80% 정도의 hit rates

4.4. Haystack Store

4.4.1 Experimental Setup

  • 2U storage blade
  • 2 hyper-threaded quad-core Intel Xeon CPUs
  • 48GB memory
  • hardware raid controller with 256-512MB NVRAM
  • 12 x 1 TB SATA drives
  • RAID-6 partition
    • 9TB of capacity
    • 적절한 redundancy와 뛰어난 읽기 성능, 낮은 스토리지 비용.
    • NVRAM write-back cache가 RAID-6의 write performance 저하를 완화
    • Store 장비에서의 캐싱은 비효율적이므로 NVRAM은 쓰기 용도로만 사용
  • Crash나 Power loss 시의 data consistency를 위해서 disk cache(?)는 disable.

4.4.2 Benchmark Performance

  • Randomio
    • random 64KB reads, direct I/O (sector aligned requests)
    • 읽기 throughput의 베이스라인 설정.
  • Haystress
    • buffer cache의 영향을 줄이기 위해 커다란 이미지 셋에 대한 랜덤 읽기를 사용.
    • Workload A randoms reads to 64KB on Store machine with 201 volumes
      • 770.6 qps, 85% of the raw throughput / 17% higher latency
  • 오버헤드의 원인
    • Haystack은 파일시스템 위에서 동작.
    • 디스크 읽기는 전체 needle을 읽기 위해 필요한 64KB보다 큼.
    • RAID-6 device stripe size에 align되지 않았을 가능성이 있으나, 적은 확률일 것임.
    • 인덱스 액세스, checksum 계산 등으로 인한 CPU 오버헤드.

4.4.3 Production workload

  • Multi-write latencies are very flat and stable
    • NVRAM allows us to write needles async and issue a single fsync to flush the volume file once the multi-write is complete
  • Read performance impacted by
    • number of photos stored on the machine
    • cached in the Cache (for read-only, buffer cache would be more effective)
    • recently written photos are usually read back immediately
  • CPU idle time varies 92-96%