이 논문은 Paxos를 실제로 구현하고자 할 때 고려해야할 현실적인 문제들과 해결방식을 설명하고 있다. Google은 Chubby에서 필요로하는 분산 로그 스토리지를 위해 기존의 상용 솔루션을 대체하는 Paxos를 구현하게 되었다고 한다.
Paxos를 이용한 분산 로그 스토리지의 기본적인 아이디어는 데이터베이스 오퍼레이션에 대해서 Paxos 알고리즘을 반복적으로 적용하면, 복제본들 사이에 데이터베이스 오퍼레이션들의 로그를 동일하게 쌓아올릴 수 있고, 결과적으로 분산 로그 스토리지를 구현할 수 있게 된다. Chubby가 기반하고 있는 분산 DB는 Paxos로 구현된 분산 로그 스토리지에 저장된 데이터오퍼레이션들의 리플레이 로그와, 스냅샷을 관리한다. 흥미로운 점은 분산 로그 스토리지는 다른 분산 시스템을 만들 때에도 강력한 기초가 될 수 있기 때문에, 분산 로그와 분산 DB 레이어 사이를 깔끔한 인터페이스로 정의해서 분산 로그를 재사용할 수 있도록 모듈화를 꾀한 점이다.
이 논문에서는 Paxos 알고리즘에 대한 괜찮은 설명이 나오는데, Multi-Paxos에 대한 언급도 나온다. 그동안 Paxos 알고리즘의 최적화에 해당하는 Multi-Paxos라는 단어를 도입한 것이 이 논문이라고 생각하고 있었는데, 이번에 블로그 글을 정리하다가 다시 찾아보니, 애초에 Paxos의 오리지널 논문인 Lamport의 “Part-time Parliament”에서 Multi-decree parliament라는 알고리즘으로 소개되고, 이를 Prisco, Lampson, Lynch의 “Revisiting the Paxos algorithm”에서 처음으로 “Multi-Paxos”라는 이름을 사용한 것으로 보인다.
Handling disk corruption
디스크 손상에 의한 두가지 상황, 즉 파일의 내용이 바뀌었거나 파일 자체가 없어지는 경우에 대한 탐지 방법을 제시하고 있다. 전자는 각 파일에 대한 checksum을 유지하는 방법으로 해결하고, 후자는 리플리카였음을 나타내는 marker를 GFS에 보관하는 방법으로 디스크 손상을 탐지한다. (이러한 방법과 관련해 역시 유명한 Viewstamped Replication 논문을 인용하고 있으므로 나중에 꼭 읽어보도록 하자.)
디스크 손상이 탐지되었을 경우에는 리빌드를 진행하게 되는데, non-voting member 즉, promise나 acknowledgement 메시지에는 응답하지 않는 노드로서 Paxos에 참가하고 catch-up mechanism을 사용한다. catch-up mechanism 자체에 대해서는 이 논문에는 그리 자세한 설명은 없다.
이렇게 디스크 손상에 대비하는 메커니즘이 있다면 모든 쓰기를 디스크에 바로 flush하지 않아도 되는 최적화가 가능해진다.
Master leases
stale data를 읽지 않도록 보장하기 위해서는, 읽기 오퍼레이션 조차도 Paxos로 실행해서 다른 업데이트들과 직렬화되도록 하는 것이 필요하다. 일반적으로 읽기 오퍼레이션이 대부분의 오퍼레이션을 차지하므로 이는 매우 비효율적이다.
이에 대한 해결책으로 제시하는 것이 마스터 리스이고, 이는 마스터가 리스를 가지고 있는 동안에는 다른 복제본이 Paxos로 업데이트를 진행할 수 없는 것을 보장하기 때문에, 읽기 오퍼레이션을 로컬에서 제공할 수 있게 된다. 그리고, 리스가 만료되기 전에 보통 갱신되기 때문에 마스터가 대체로 항상 리스를 가지고 있게 된다.
Epoch numbers
어떤 복제본으로 업데이트 요청이 왔을 때 마스터의 지위를 잃어버리거나 다시 얻었다면, 그 요청은 중단되어야 하는데, 이를 구분하기 위해 어떤 마스터가 연속적으로 재임하는 동안에는 같은 값으로 유지되는 Epoch number라는 개념을 도입하고 있다. 아마도 Raft 알고리즘의 term과 동일한 개념이 아닐까 싶다.
Group membership
그룹 멤버쉽 문제란 복제본이 추가되거나 제거되는 등 복제본의 집합에 일어난 변화를 어떻게 다루는지에 대한 문제다. 그룹 멤버쉽 문제에도 Paxos를 활용해 해결할 수가 있다는 아이디어를 인용하고 있는데, 아이디어 자체는 단순하나 – 복제본 집합의 변화의 로그를 Paxos로 유지하면 될 것이다 – 디스크 손상 등의 문제들을 고려해서 구현하는 것은 쉽지 않다고 얘기하고 있다.
Snapshots
로그를 무한정 쌓을 수 없으므로 스냅샷을 생성해서 기존의 로그가 필요하지 않게 만드는 메커니즘을 필요로 한다. Paxos 자체는 복제된 로그의 일관성에만 관심이 있지, 복제되고 있는 데이터 구조 자체에 대해서는 인지하지 못하므로, 스냅샷을 생성하는 것은 애플리케이션의 책임이 된다. 따라서, 애플리케이션은 자유롭게 스냅샷을 생성하고, 스냅샷이 생성된 것에 대해서 Paxos 프레임워크 쪽에 알려주어, Paxos 프레임워크가 스냅샷 이전의 로그를 삭제할 수 있도록 한다. 스냅샷은 복제본 사이에 동기화되지 않으며, 각각의 복제본은 독립적으로 스냅샷을 생성할 시점을 결정한다.
이러한 메커니즘은 간단해보이지만, 로그와 스냅샷이 서로 일관성있게 유지되도록 하기 위한 복잡성들이 존재한다.
각각의 스냅샷에 관련된 모든 Paxos 관련 정보 – 스냅샷을 생성하는 시점의 Paxos instance number와 그 시점의 그룹 멤버쉽 – 를 담고 있는 스냅샷 핸들이라는 개념이 존재한다. 스냅샷을 복구할 때는, 애플리케이션은 스냅샷 핸들을 Paxos 프레임워크 쪽에 전달하고, Paxos 프레임워크 이를 이용해 복구를 진행한다.
클라이언트가 스냅샷을 생성하려고 할 때는 먼저 스냅샷 핸들을 요청한다. 클라이언트는 스냅샷을 생성하는데, 이 스냅샷은 스냅샷 핸들의 정보와 일관되어야 하므로 Paxos가 진행됨에 따라서 업데이트라 발생하는 것에 대해서 주의를 기울여야 한다. 마지막으로 스냅샷이 생성되었다면 클라이언트는 Paxos 프레임워크에 스냅샷 핸들을 전달하면서 스냅샷이 생성되었음을 알린다. 스냅샷의 생성에 실패했다면 단순히 Paxos 프레임워크에 이를 알리지 않는 것으로 충분하다.
catch-up을 위해서는 다른 복제본으로부터 최근의 스냅샷을 얻어오고, 나머지 로그 레코드들도 다른 복제본으로부터 얻어온다.
Database transactions
CAS (compare and swap) 오퍼레이션을 atomic하게 유지하기 위해서 CAS에 관련된 데이터를 Paxos 상에서의 하나의 값으로 취급한다. 실제로 데이터베이스 트랜잭션을 구현하지 않더라도, 이러한 방식을 확장함으로써 트랜잭션 스타일의 지원이 가능하다. MultiOp은 이러한 트랜잭션 지원을 위해 atomic하게 적용되고 guard, top, fop의 세가지 부분으로 이루어져 있다. guard라고 불리는 테스트의 리스트가 한가지 부분이고, 이 모든 테스트가 참이라면 데이터베이스 오퍼레이션의 리스트인 top을 실행하고, 그렇지 않다면 역시 데이터베이스 오퍼레이션의 리스트인 fop을 실행한다.
Software Engineering
알고리즘의 표현, 실행시간 일관성 검증, 테스팅, 동시성 등에 관련한 이슈들을 설명하고 있는데, 특별히 흥미로운 테스팅 항목에 대해 살펴보자.
시스템에 랜덤한 실패들을 집어넣더라도 Safety와 Liveness를 만족하는지를 테스트하는데, 테스트가 실패했을 때 이를 재현할 수 있어야 하므로, 테스트가 시작할 때 설정된 시드를 확보해서 동일한 테스트를 재현할 수 있도록 하고 있다.
이외에도 Chubby 시스템이 하위 시스템의 실패에도 잘 동작하는지를 테스트하기 위해서, 분산 로그 시스템에 여러 hook을 구현하고 역시 랜덤하게 실패하도록 했을 때 Chubby 시스템이 그러한 실패들을 잘 극복하는지를 테스트 하기도 한다.
이러한 테스트 방식은 체계적인 방식은 아니나, 최근 분산시스템을 테스트하는 것에 대해서는 일반적인 방법으로 자리잡아 나가고 있는 것 같다.
Summary
이 논문의 마지막 부분은 컴파일러 개발 분야가 매우 복잡하지만, 이론이 널리 알려져있고 yacc, ANTLR 등의 도구와 같이 현실에도 잘 적용되어있는 점을 지적하면서, 분산시스템 분야는 그렇지 않음을 지적하고 있다.
There are significant gaps between the description of the Paxos algorithm and the needs of a real-world system. In order to build a real-world system, an expert needs to use numerous ideas scattered in the literature and make several relatively small protocol extensions. The cumulative effort will be substantial and the final system will be based on an unproven protocol.
The fault-tolerance computing community has not developed the tools to make it easy to implement their algorithms.
The fault-tolerance computing community has not paid enough attention to testing, a key ingredient for building fault-tolerant systems.
References
Paxos는 Paxos made simple을 통해서 공부했었는데, 시간이 날 때 오리지널 페이퍼인 Lamport, Leslie. "The part-time parliament." ACM Transactions on Computer Systems (TOCS) 16.2 (1998): 133-169.를 읽어보는 것이 좋을 듯 하다.
리스에 관해서는 Gray, Cary, and David Cheriton. Leases: An efficient fault-tolerant mechanism for distributed file cache consistency. Vol. 23. No. 5. ACM, 1989.을 참고하자.
그룹 멤버쉽 문제에 관해서 Cristian, Flaviu. "Reaching agreement on processor-group membership in synchronous distributed systems." Distributed Computing 4.4 (1991): 175-187.
인과성을 이용하는 분산 스토리지의 동작에 있어서 논리적 시계 집합에 대한 sync 와 update 2개의 오퍼레이션이 핵심을 이루고 있다고 주장하고 있다.
먼저 sync 오퍼레이션의 경우에는 두 개의 시계 집합을 취해서 두 집합의 원소들인 논리적 시계들 사이에 인과성의 관계가 있다면 이전에 해당하는 시계를 모두 버리고, 남아있는 원소들로 구성된 집합을 반환하는 오퍼레이션이다. 결과적으로 반환되는 집합은 동시적 (concurrent)인 관계에 있는 시계들로만 이루어지게 된다.
sync는 클라이언트와 서버 도는 서버의 노드들 사이의 동기화가 필요한 시점에 논리적 시계에 기반해서 과거의 값들을 버리기 위한 오퍼레이션이라고 볼 수 있다. 여기서 재미있는 것은 sync는 논리적 시계가 실제로 어떻게 구현되어있는지에 상관없이 시계들 사이의 부분순서 (partial order)만을 이용해서 일반적으로 정의할 수 있다는 것이다.
update 오퍼레이션은 어떤 시계 집합 (통상적으로 클라이언트)과 서버의 어떤 노드의 시계 집합, 서버의 식별자를 취하고 하나의 시계를 반환하는 오퍼레이션이다. 이 시계는 클라이언트 시계 집합 내의 모든 시계들을 dominate하고, 시스템 내의 시계들의 어떤 join에 의해서도 dominate되지 않아야 한다. (즉, dominate하거나 concurrent 해야한다.)
인과적인 이력 (causal histories)의 경우, update 오퍼레이션은 다음과 같이 정의할 수 있다. 시스템 전체에서 고유한 사건 식별자를 얻어서 클라이언트 시계 집합의 각 시계에 추가하는 방식이다.
이어서 분산 스토리지의 get/put 오퍼레이션에서 위에서 정의한 sync/update 오퍼레이션을 이용해서 논리적 시계를 어떻게 다루는지에 대해서 설명하고 있으나 여기서는 생략하기로 하자.
Dotted Version Vectors
버전 벡터가 (id, m)의 형태로 표기된다면 dotted version vector는 (id, m, n)과 같이 표기할 수 있다. 버전 벡터가 연속적인 인과적인 이력을 표현하고 있다면, dotted version vector는 그 연속적인 이력에 n에 해당하는 독립적인 사건을 추가한 것을 표현할 수 있다.
예를 들어, {(a,2),(b,1),(c,3,7)}이라는 dotted version vector는 {a1, a2, b1, c1, c2, c3, c7}와 같은 인과적인 이력을 표현하고 있다.
dotted version vector를 인과적인 이력으로 정의했다면, 부분순서가 어떻게 정의되는 지를 살펴볼 때다.
dotted version vector를 인과적인 이력으로 변환해서 생각하면 당연한 결과라고 할 수 있다.
부분순서가 정의되어있으므로 sync 오퍼레이션은 추가적으로 정의할 필요가 없으나, update 오퍼레이션은 dotted version vector에 대해 정의할 필요가 있다.
이 간결한 식에 매우 많은 의미를 담고 있는데, 합집합(union)의 좌항의 경우, 클라이언트의 시계집합(S)에 속하는 노드의 식별자들 중 파라미터에서 제공한 노드의 식별자 (r)가 아닌 것들에 대해서, 각각의 식별자와 클라이언트 시계집합에서 해당 식별자에 대한 가장 큰 sequence를 pair로 하는 버전 벡터들을 나타낸다. 우항의 경우에는 파라미터에서 제공한 노드의 식별자(r)에 대한 dotted version vector를 구성하고 있는데, 좌항의 경우와 유사하게 첫번째 정수는 클라이언트의 시계집합에서 해당 식별자에 대한 가장 큰 sequence가 된다. 양쪽 모두 클라이언트의 문맥을 표현하는 것이라고 볼 수 있다. 두번째 정수는 조금 특이한데, 파라미터에서 제공한 노드의 시계 집합에서 역시 해당 노드의 식별자에 대한 가장 큰 sequence를 얻은 후 1만큼 증가시켜준 값으로 설정된다. 이는 노드 상의 문맥에서 업데이트로 인한 새로운 사건을 기록한 것이라고 볼 수 있다.
이러한 update 오퍼레이션을 그대로 적용하면 다음과 같이 아름답게 움직이는 시스템이 된다.
논문에서는 dotted version vector에 대한 correctness에 대해서 설명하고 있는 듯 하나 생략하도록 한다. 이 논문을 여러번 다시 읽어보았지만 dotted version vector가 어떻게 문제를 해결하는지에 대한 직관적인 설명을 하는 것이 아직도 어려운 것 같다.
Related Work
Lamport clock
L. Lamport, “Time, clocks and the ordering of events in a distributed system,” Communications of the ACM, vol. 21, no. 7, pp. 558–565, Jul. 1978.
http://blog.lastmind.net/archives/720
Version vector
D. S. Parker, et al., “Detection of mutual inconsistency in distributed systems,” Transactions on Software Engineering, vol. 9, no. 3, pp. 240–246, 1983.
Vector clock
C. Fidge, “Timestamps in message-passing systems that preserve the partial ordering,” in 11th Australian Computer Science Conference, 1989, pp. 55–66.
http://blog.lastmind.net/archives/736
F. Mattern, “Virtual time and global clocks in distributed systems,” in Workshop on Parallel and Distributed Algorithms, 1989, pp. 215–226.
Dynamic creation and retirement of vector entries
R. A. Golding, “A weak-consistency architecture for distributed information services,” Computing Systems, vol. 5, pp. 5–4, 1992.
K. Petersen, et al., “Flexible update propagation for weakly consistent replication,” in Sixteen ACM Symposium on Operating Systems Principles, Saint Malo, France, Oct. 1997.
P. S. Almeida, et al., “Interval tree clocks,” in Proceedings of the 12th International Conference on Principles of Distributed Systems, ser. OPODIS ’08. Berlin, Heidelberg: Springer-Verlag, 2008, pp. 259–274.
Scalability problems
B. Charron-Bost, “Concerning the size of logical clocks in distributed systems,” Information Processing Letters, vol. 39, pp. 11–16, 1991.
D. H. Ratner, “Roam: A scalable replication system for mobile and distributed computing,” Ph.D. dissertation, 1998, uCLA-CSD-970044.
R. Prakash and M. Singhal, “Dependency sequences and hierarchical clocks: Efficient alternatives to vector clocks for mobile computing systems,” Wireless Networks, pp. 349–360, 1997, also presented in Mobicom96.
P. Mahajan, S. Setty, S. Lee, A. Clement, L. Alvisi, M. Dahlin, and M. Walfish, “Depot: Cloud storage with minimal trust,” in OSDI 2010, Oct. 2010.
D. Malkhi and D. B. Terry, “Concise version vectors in winfs,” in DISC, ser. Lecture Notes in Computer Science, P. Fraigniaud, Ed., vol. 3724. Springer, 2005, pp. 339–353.
V. Ramasubramanian, et al., “Cimbiosys: a platform for contentbased partial replication,” in Proceedings of the 6th USENIX symposium on Networked systems design and implementation. Berkeley, CA, USA: USENIX Association, 2009, pp. 261–276.
F. J. Torres-Rojas and M. Ahamad, “Plausible clocks: constant size logical clocks for distributed systems,” Distributed Computing, vol. 12, no. 4, pp. 179–196, 1999.
Trade-off
B. B. Kang, R. Wilensky, and J. Kubiatowicz, “The hash history approach for reconciling mutual inconsistency,” in Proceedings of the 23nd International Conference on Distributed Computing Systems (ICDCS). IEEE Computer Society, 2003, pp. 670–677.
AJOUX, Phillipe, et al. Challenges to Adopting Stronger Consistency at Scale. In: 15th Workshop on Hot Topics in Operating Systems (HotOS XV). USENIX Association. (pdf, slides)
최근 geo-replicated data store에서 높은 consistency를 제공하기 위한 연구들이 많이 있다고 한다. – 이 논문에서 나열하고 있는 그런 논문들의 목록도 흥미로운데 이는 마지막에 한번 정리해보자. – Facebook에서도 사용자들에게 – 특히, 읽기에 대한 – end-to-end consistency를 제공하기 위해서 이러한 연구 결과들을 도입하고 싶으나, 여러가지 이유로 이것이 쉽지가 않은데, 그 이유가 무엇인지 일단 알아보자…라는 것이 이 논문의 취지.
There have been many recent advances in distributed systems that provide stronger semantics for geo-replicated data stores like those underlying Facebook. These research systems provide a range of consistency models and transactional abilities while demonstrating good performance and scalability on experimental workloads. At Facebook we are excited by these lines of research, but fundamental and operational challenges currently make it infeasible to incorporate these advances into deployed systems. This paper describes some of these challenges with the hope that future advances will address them.
Facebook은 확장성과 효율을 위해서 샤딩, 복제, 캐싱 등을 아주 많이 활용하고 있는 것으로 보인다. Sodcial graph 시스템을 예로 들고 있는데, 하나의 node나 edge가 MySQL(!)에 쓰여지면, 이것이 비동기적으로 다른 데이터 센터로 복제되고, 여러 계층의 캐시들을 업데이트하고, pub-sub 시스템에 의해 검색이나 뉴스피드를 위한 다른 시스템으로 전달된다. 당연한 이야기지만, Facebook 내의 여러 기능에 해당하는 시스템들은 효율을 위해 각자 특별한 샤딩 방식이나 캐시, 인덱스들을 유지해야한다. Twitter (http://blog.lastmind.net/archives/664)나 LinkedIn (http://blog.lastmind.net/archives/657)의 시스템과 같이 Primary Data Store (DB)로부터 비동기적인 복제를 통해 Secondary Index를 생성하거나 다른 서비스로 이벤트를 전달하는 방식은 현재의 대규모 인터넷 서비스에서는 매우 공통적인 부분으로 보인다.
Facebook relies on sharding, data replication, and caching to efficiently operate at scale. When a node or edge is added to the social graph, it is first written to a single MySQL instance. From there it is asynchronously replicated to all of our data centers, updated or invalidated in multiple layers of cache, and delivered through a scalable publisher-subscriber system to other services such as Search and News Feed. No single data placement strategy can efficiently serve all workloads in a heavily sharded system, so many of these services choose a specialized sharding function and maintain their own data store, caches, and indexes.
Fundamental Challenges
Integrating Across Stateful Services
Facebook의 Social graph는 여러 서비스들을 위한 시스템에 각 서비스를 위한 형태로 복제되고 있다고 한다. 문제는, 대부분의 연구 결과들에서 나타나는 설계들은 단일한 서비스를 상정하고 있다는 것이다. Facebook는 그러한 여러한 서비스가 따로 사용자에게 제공되기 보다는 하나의 서비스처럼 제공되므로 여러가지 문제들이 발생하게 된다.
Facebook’s architecture of cooperating heterogeneous services is different from the monolithic service that most research designs assume. This difference alone is not fundamental—Facebook can be externally viewed as a single service even though internally it is comprised of many services.
Storing Data Consistently Across Services
Facebook의 각 서비스들은 Social graph의 일부에 대한 캐시, 인덱스, 복제본 등을 가지고 있다. Wormhole이라는 Facebook의 pub-sub 시스템은 업데이트를 비동기적으로 여러서비스들에 전달해주고 있다.
In a sharded and scaled environment like Facebook’s, services may maintain their own optimized cache, index, or copy of a portion of the social graph. Wormhole [40] is our scalable pub-sub system that asynchronously delivers update notifications to all of the services that manage state.
이 때, 만약 높은 consistency를 제공하기 위해서는 동기적으로 업데이트 통지를 전달할텐데, availability와 latency를 떨어뜨리는 요인이 된다. 인과성을 위한 시스템을 사용하더라도 여러 서비스들 사이의 의존관계를 표현하는 것은 굉장한 복잡도를 가져오게 된다.
To provide strong consistency across our services, we would need to propagate update notifications synchronously impacting availability and latency. A causal system could avoid atomic updates, but would still need to be integrated into every service and threaded through every communication channel greatly increasing the complexity of the dependency graph.
Decentralized Access to Services
클라이언트는 한 화면에서 표시되는 여러 컨텐츠를 별도의 리퀘스트를 통해 가져와서 클라이언트에서 통합한다. 이 때문에 클라이언트는 일관되지 않은 결과들을 가지고 session-based guarantee를 보장함으로써 사용자에게 일관된 상태를 보여주어야 한다.
The Facebook web site and its mobile applications use parallelism and pipelining to reduce latency. The notification bar and the central content are fetched by separate requests, for example, and then merged at the client. Mobile applications also aggressively cache results using local storage, and then merge query results with previously seen values. […] but it leaves the user’s device as the only safe location to define session-based guarantees or demarcate isolation boundaries.
단일한 사용자는 보통 동일한 지역의 동일한 클러스터로 라우팅 되므로 이러한 성질은 사용자가 inconsistency를 경험할 확률을 낮추어준다. 하지만, 그러한 성질이 보장되는 것은 아니므로 역시 문제는 있다.
Requests for a single user are usually routed to the same cluster in the same region. This routing stability assists in reducing the number of user-visible inconsistencies, because most of the variance in asynchronous update propagation is inter-region.
Composing Results
조금 설명이 애매하기는 한데, 여러 서비스로부터의 결과를 조합해야할 경우, 그 사이의 의존관계를 다루어야 하는데, 각 서비스는 각자의 서비스를 위한 일부의 결과를 가지고 있기 때문에, 이를 일반적으로 처리하기가 쉽지 않다는 의미인 듯 하다.
Each service can be thought of as implementing its own limited query language, so it is not clear that the dependencies can be captured in a way that is both sufficiently generic and sufficiently precise.
Query Amplification
사용자 입장에서는 하나의 쿼리지만, 내부적으로는 여러 서비스들에 대한 수천개의 쿼리로 실행될 수 있다는 이야기.
For example, a user’s home page queries News Feed to get a list of stories, which depends on a separate service to ensure story order of previously seen content. These stories are then fetched from TAO to get the content, list of comments, likes, etc. In practice, this behavior produces fork/join query patterns that have fan-out degrees in the hundreds and critical paths dozens deep.
Slowdown Cascades
동일한 데이터가 각각의 서비스에서는 서로 다른 샤딩 방식과 인덱싱을 통해 저장되므로, 서로 다른 서비스들의 샤드 사이에 all-to-all ordering 의존 관계가 발생하고, 높은 consistency를 요구하는 시스템에서는 하나의 서비스에서 하나의 샤드만 실패 또는 지연되더라도 이에 의존하는 모든 다른 서비스의 모든 샤드에 영향을 줄 수 있다는 이야기.
Different services use different sharding and indexing schemes for the same data. For instance, TAO shards data based on a 64-bit integer id, News Feed shards based on a user id, and the secondary index system shards based on values. The use of different sharding schemes creates all-to-all ordering dependencies between shards in different services. This connectedness accelerates slowdown propagation in strongly consistent systems. A failure in one shard of one service would propagate to all shards of a downstream service.
Latency Outliers
여러 쿼리들을 조합해야하는 경우, 하나의 쿼리만 느리더라도 최종적인 응답에 영향을 미칠 수 있음. 이것이 strong consistency와 직접적인 연관성이 있는지는 명확하게 이해하기 어려우나, strongly consistent system의 응답속도나 outlier들이 그렇지 않은 시스템들에 비해 더 높기 때문에 언급되는 것 같음.
Parallel subqueries amplify the impact of latency outliers. A user request does not return until it has joined the results of all of its queries, and thus must wait for the slowest of its internal requests.
Linchpin Objects
데이터의 개수도 많고 매우 자주 읽히는데다 쓰기도 많은 무언가를 Linchpin Object라고 부르고 있다. 흔히 Facebook의 연예인 페이지나 유명 대기업의 페이지를 상상하면 될 것 같다. 역시 좋은 성능을 제공하는 것이 strongly consistent system에서는 중요하다고 얘기하고 있다.
The most frequently read objects are often also frequently written to and highly connected. Examples of these linchpin objects include popular users like celebrities, popular pages like major brands, and popular locations like tourist attractions. […] A system that provides stronger consistency must provide good performance for these linchpin objects.
Net Benefit to Users
stronger consistency는 프로그래머가 프로그램의 정확성에 대해 생각하기 편리하고, 사용자에게 일관성을 제공한다는 측면에서 이점을 가지고 있지만, 이를 정량적으로 측정하기는 어려울 뿐만 아니라, 그러한 이점들이 단점들 – 커뮤니케이션 오버헤드, 무거운 상태관리, 응답속도의 증가 – 을 넘어서지 않을지도 모름.
Systems with strong consistency are easier for programmers to reason about and build on top of [6, 12] and they provide a better experience for users because they avoid some anomalous application behavior. However, these benefits are hard to quantify precisely, and they do not appear in isolation. When all aspects of user experience are considered it might even be the case that the benefit does not outweigh the detriment. […] Stronger properties are provided through added communication and heavier-weight state management mechanisms, increasing latency. Although optimizations may minimize these overheads during failure-free execution, failures are frequent at scale. Higher latency may lead to a worse user experience, potentially resulting in a net detriment.
Operational Challenges
Fully Characterizing Worst-Case Throughput
여러 서비스가 얽혀있는 복잡한 시스템에서 실패나 복구 방법에 대해 미리 예측하거나 관찰하기 쉽지 않을 뿐더러 이를 정형화하기도 쉽지 않다는 얘기.
While there is no theoretical impediment to preserving worst-case throughput despite strengthening Facebook’s consistency guarantees, our experience is that failure and recovery behavior of a complex system is very difficult to characterize fully. Emergent behavior in cross-service interactions is difficult to find ahead of time, and may even be difficult to identify when it is occurring
Polyglot Environment
(아마도 백엔드) 서비스 내에서 C++이 가장 보편적으로 사용되는 언어인 것은 매우 신기함.
While C++ is the predominant language for standalone services at Facebook, Python, Java, Haskell, D, Ruby and Go are also supported by Thrift, our inter-service serialization format. The application code coordinating service invocations is generally written in Hack and PHP, and final composition of query results may be performed by JavaScript in the browser, Objective C/C++ on IOS, or Java on Android.
여러 언어를 사용하고 있기 때문에 인터페이스나 쓰레딩 모델 등에 대해 가정하는 것이 어렵다는 이야기.
Ports or foreign function interfaces must be provided to give all of these codebases access to an end-to-end consistency platform. Perhaps the trickiest part of this multi-language support is the wide variety of threading models that are idiomatic for different languages and runtimes, because it is likely the underlying consistency system will need to initiate or delay communication.
Varying Deployment Schedules
낮은 레벨의 서비스들은 매우 보수적인 deployment 프로세스를 가지고 있으므로, 이에 관련한 시스템을 빠르게 개발하는 것은 쉽지 않을거라는 이야기.
Facebook has an extremely aggressive deployment process for the stateless application logic, but we are necessarily more conservative with stateful and mature low level services. An inter-service coordination mechanism that is used by or interposes on these low-level services will have a deployment velocity that is constrained by the release engineering requirements of the most conservative component.
Reduced Incremental Benefit in a Mature System
consistency에 관해 이미 적용된 workaround들이 있으므로, 이를 일반적으로 개선하는 시스템의 이득이 떨어진다는 이야기.
While these workarounds would not exist in a newly built system, their presence in Facebook reduces the incremental benefits of deploying a generic system for stronger consistency guarantees.
Related Work
Prior Facebook Publications
memcache cache
B. Atikoglu, et al. Workload analysis of a large-scale keyvalue store. In SIGMETRICS, 2012.
R. Nishtala, et al. Scaling memcache at facebook. In NSDI, 2013.
TAO graph store
N. Bronson, et al. Tao: Facebook’s distributed data store for the social graph. In USENIX ATC, 2013.
Wormhole pub-sub system
Y. Sharma, et al. Wormhole: Reliable pub-sub to support geo-replicated internet services. In NSDI, May 2015
a characterization of load imbalance in the caches
Q. Huang, et al. Characterizing load imbalance in real-world networked caches. In HotNets, 2014.
an analysis of the messages use case
T. Harter, et al. Analysis of hdfs under hbase: A facebook messages case study. In FAST, 2014.
understanding and improving photo/BLOB storage and delivery
D. Beaver, et al. Finding a needle in haystack: Facebook’s photo storage. In OSDI, 2010. Q. Huang, et al. An Analysis of Facebook Photo Caching. In SOSP. ACM, 2013.
S. Muralidhar, et al. f4: Facebook’s warm blob storage system. In OSDI, 2014.
L. Tang, et al. RIPQ: Advanced Photo Caching on Flash For Facebook. In FAST, Feb. 2015.
Systems with Stronger Semantics
replicated state machines
L. Lamport. Time, clocks, and the ordering of events in a distributed system. Comm. ACM, 21(7), 1978.
F. B. Schneider. Implementing fault-tolerant services using the state machine approach: a tutorial. ACM Computer Surveys, 22(4), Dec. 1990.
causal and atomic broadcasts
K. P. Birman and R. V. Renesse. Reliable Distributed Computing with the ISIS Toolkit. IEEE Comp. Soc. Press, 1994.
systems designed for high availability and stronger consistency
K. Petersen, et al. Flexible update propagation for weakly consistent replication. In SOSP, Oct. 1997.
scalable causal consistency for datacenter-scale and/or geo-replicated services
S. Almeida, et al. Chainreaction: a causal+ consistent datastore based on chain replication. In EuroSys, 2013.
P. Bailis, et al. Bolton causal consistency. In SIGMOD, 2013.
J. Du, et al. Orbe: Scalable causal consistency using dependency matrices and physical clocks. In SOCC, 2013.
J. Du, et al. Gentlerain: Cheap and scalable causal consistency with physical clocks. In SOCC, 2014.
W. Lloyd, et al. Don’t settle for eventual: Scalable causal consistency for wide-area storage with COPS. In SOSP, Oct. 2011.
W. Lloyd, et al. Stronger semantics for low-latency geo-replicated storage. In NSDI, Apr. 2013.
strong consistency for datacenter-scale and/or geo-replicated services
J. C. Corbett, et al. Spanner: Google’s globally distributed database. In OSDI, Oct. 2012.
R. Escriva, et al. HyperKV: A distributed, searchable key-value store for cloud computing. In SIGCOMM, 2012.
L. Glendenning, et al. Scalable consistency in Scatter. In SOSP, Oct. 2011.
C. Li, D. Porto, A. Clement, J. Gehrke, N. Preguic¸a, and R. Rodrigues. Making geo-replicated systems fast as possible, consistent when necessary. In OSDI, Oct. 2012.
J. Ousterhout, P. Agrawal, D. Erickson, C. Kozyrakis, J. Leverich, D. Mazi`eres, S. Mitra, A. Narayanan, M. Rosenblum, S. M. Rumble, E. Stratmann, and R. Stutsman. The case for ramcloud. ACM SIGOPS Operating Systems Review, 43(4), 2010.
D. B. Terry, V. Prabhakaran, R. Kotla, M. Balakrishnan, M. K. Aguilera, and H. Abu-Libdeh. Consistency-based service level agreements for cloud storage. In SOSP, 2013.
transactions for datacenter-scale and/or geo-replicated services
P. Bailis, et al. Scalable atomic visibility with ramp transactions. In SIGMOD, 2014.
J. Baker, et al. Megastore: Providing scalable, highly available storage for interactive services. In CIDR, Jan. 2011.
J. C. Corbett, et al. Spanner: Google’s globally distributed database. In OSDI, Oct. 2012.
T. Kraska, et al. Mdcc: Multi-data center consistency. In EuroSys, 2013.
W. Lloyd, et al. Stronger semantics for low-latency geo-replicated storage. In NSDI, Apr. 2013.
S. Mu, et al. Extracting more concurrency from distributed transactions. In OSDI, 2014.
Y. Sovran, et al. Transactional storage for geo-replicated systems. In SOSP, Oct. 2011.
A. Thomson, et al. Calvin: fast distributed transactions for partitioned database systems. In SIGMOD, May 2012.
C. Xie, et al. Salt: combining acid and base in a distributed database. In OSDI, 2014.
Y. Zhang, et al. Transaction chains: achieving serializability with low latency in geo-distributed storage systems. In SOSP, pages 276–291. ACM, 2013.
Discussions of Challenges to Stronger Consistency
criticism of enforcing an order in a communication substrate instead of end to end
D. R. Cheriton, et al. Understanding the limitations of causally and totally ordered communication. In SOSP, Dec. 1993.
slowdown cascades to motivate enforcing only an explicit subset of causal consistency
P. Bailis, et al. The potential dangers of causal consistency and an explicit solution. In SOCC, 2012.
detail the importance and challenges for tail latency in high-scale services at Google
J. Dean, et al. The tail at scale. Comm. ACM, 56(2), 2013.
My Opinion
Strongly consistent system의 연구자들에게 방향을 제시하기 위해 Facebook의 시스템이라는 도메인 내에서 발생하는 숙제들을 정의하고 있는 논문이라고 볼 수 있는데, 마치 Facebook에서 strongly consistent system을 일반적으로 도입해서는 안되는 이유들을 정리해놓은 듯한 느낌이 들었다. 오히려 나의 관심은 Facebook에서 데이터의 복제에 관한 시스템과 복제본 사이의 consistency 문제를 어떻게 다루고 있는가 였는데, 우리가 고심하고 있는 문제들이나 해결 방향이 상당히 닮아 있다는 인상을 받았다. 여러 서비스에 걸친 업데이트 통지를 위해 인프라로서 pub-sub을 적극적으로 활용하고 있는 점이나, 서비스별로 소셜 그래프의 별도 인덱스나 캐시 등을 잘 구축해서 활용하고 있는 점은 잘하고 있다고 생각했다. Facebook의 graph storage인 TAO, pub-sub 시스템인 Wormhole 등에 대해 관심이 생겼다.
Facebook의 VP of Engineering인 Jay Parikh와의 자동화에 관한 인터뷰.
하드웨어의 실패에 따른 자동화된 진단과 복구를 위한 FBAR라는 자동화 시스템에 대해 이야기하고 있다. FBAR에 대해서는 조금 오래된 글 (2011년)이지만, 다음 글에서 조금 더 자세한 내용을 볼 수 있다. https://www.facebook.com/notes/facebook-engineering/making-facebook-self-healing/10150275248698920
We built a system we call FBAR, for Facebook Auto Remediation, to do a very basic set of hardware remediation tasks. Before, if a server had a hard drive failure or some hardware error, an alarm would go off and some human would have to log in, or walk to the computer, and try to debug or fix it. You’d do some things to try to fix it in software, you’d reboot the machine, you might try to reimage it. A lot of that software remediation and debugging is all automated now. No person has to be involved with that. The system will detect the error– it could be a disk drive, it could be a CPU, it could be a networking card or power failure—and can go ahead and do a bunch of things that it knows how to do.
당연한 이야기이지만, 자동화의 이점 중 하나는 휴먼 에러를 방지하는 것이다.
You don’t have to worry about that with automation, because work consistently gets done in the same way every time. And cluster turn-ups which used to take three or four months now can be done in the course of a week – sometimes in just a couple of days.
자동화를 통한 효율에 관한 이야기인데, 일반적인 IT 회사에서 200~500대의 서버당 1명이 일하고 있다면, Facebook에서는 25,000대당 1명의 효율을 가지고 있다고… 예전에도 들은 바가 있는 것 같지만, 2000대를 1명이 담당하는 정도가 되니 불안함을 느꼈던 것을 생각하면 대단한 것 같기는 하다.
So we have done a lot of work on the software automation side of this, to the point that we only need one technician in the data center for every 25,000 servers. That is a ratio that is basically unheard of. Most IT shops have ratios of one to 200, or one to 500.
이 인터뷰에서 자동화의 이점으로 가장 강조하고 있다고 느끼는 점인데, 상당히 단순한 부분들을 자동화하고 나면, 똑독한 사람들이 더욱 높은 수준의 일을 할 수 있고 미래에 대해서 생각할 수 있도록 한다는 것이다. 그리고, 그런 똑똑한 사람들이 오랜 시간에 걸쳐 평범한 일을 하게 된다면 결국 번아웃과 불행함으로 이어지고, 결국 이직을 하게 마련인데, 업무가 지루하고 반복적인 단순 업무가 되지 않도록 지속적으로 자동화를 하고 아키텍쳐를 바꿔나가야한다고 얘기하고 있다.
I think one of the major ways you do this is by continuing to keep them out of their comfort zone. If they end up doing “humdrum” work for a long time, they’re not learning anything, yet they’re spending a lot of time doing it. That leads to burnout and unhappiness, and then they’re going to go somewhere else. So, I think, if growing your company depends on keeping these brilliant technologists engaged, the necessity is that you have to keep automating and rearchitecting your systems so that things don’t become boring, monotonous, and repetitive. Automation serves your talent objectives.
변화를 위한 조직 구조에 대해 – 변화를 위한 팀과, 안정적이고 효율적인 운영을 위한 팀을 분리하는 것이 전통적인 접근.
Otherwise, you put them in a position where one team, in order to hit its goals, always wants to make changes, and another team only wants to make everything stable and cost-effective. Those two are completely opposed to each other. […] When I showed up at Facebook in 2009, this is what I saw and I thought is was perfectly reasonable. It was this way when I was at Akamai, it was this way when I got to Ning.
문제는 단기적인 비용에 기반해 의사결정을 하기 때문에, 미래를 위한 투자를 하지 않게 되고, 자동화는 작은 팀의 임무로 맡겨지기 때문에, 수많은 엔지니어들의 요구에 따라가지 못하게 되는 것.
Sometimes we were making decisions that were short-term cost based, when we should have been basing them on what we would need to be ready for a year from now. In other places, the focus on automation wasn’t there, because it wasn’t a team’s own responsibility. Questions about automation were being tossed to a small team of people who just weren’t going to keep up with this swarm of engineers that we were hiring.
Facebook에서는 이를 같은 팀에 묶고 변화와 운영을 동시에 하도록 하고 있다. 이 또한 쉬운 일은 아닌데, 인터럽트들이 장기적인 목표에서 벗어나도록 하기 때문. 데드라인에 대해서 조금 유연하게 대처할 필요가 있다고 얘기하고 있다.
No, we come up with what we call big bets as a team, planning the investments in technology that will take one or two or three years to build. For example, we built a new compiler that runs the front end of our site. It took us a couple of years to build and that R&D effort was done in parallel, in the same team that was maintaining the existing run time that was running the live Facebook.
이러한 방식의 장점에 대해서, 혁신을 위한 팀은 변화에 대한 성과에 대해서 걱정하고, 전선에서 뛰는 비즈니스 팀은 언제쯤 재미있는 일을 할 수 있는지에 대한 불만을 표하는 상황 등을 방지할 수 있다고 얘기하고 있다.
But meanwhile there are so many benefits — starting with the fact that it’s done in a much more open way. No one likes it when there’s a team working in some secure undisclosed location “doing something really cool that is going to replace what you’ve got.” It’s a tough thing to manage on both ends. The innovation team worries: “We’re not making any impact now – they’ve just stuffed us in a corner and told us go deliver something great.” And the core business team you’re depending on to deal with problems and customer support issues and all the urgent things that come up, is saying, “When do we get to work on something cool? Are we second-class citizens or something?”
의견
자동화의 이점에 대해서는 100% 동감하고 적어도 IT 업계의 일정 규모 이상의 회사라면 어떤 조직이라도 항상 중요한 목표가 되어야 한다고 생각한다. 변화팀과 운영팀을 통합하는 조직 구조에 대해서는 전반적으로는 동의하나, 아마도 VP 레벨의 입장에서는 통합된 조직일지는 몰라도 실무 수준에서는 이 글에서도 제시하듯이 인터럽트와 단기적인 비용에 기반한 요구들은 장기적인 관점에서의 업무에 매우 해롭기 때문에, 일정 규모 이상의 조직이라면, 정도의 차이는 있더라도 팀 내에서라도 어느 정도의 분리는 필요하지 않는가라고 생각한다. 다만, 변화팀과 운영팀이 서로 얼굴을 맞대고 이야기할 수 없을 정도로 서로 다른 조직의 바운더리로 분리되어 있다면, 이 글에서 지적된 것과 같은 부정적인 효과는 항상 발생할 것이다.
Dynamo, Cassandra, Riak, Voldemort와 같은 시스템들은 쓰기 가용성 (write availability)을 보장하기 위해, 어떤 하나의 데이터 항목의 여러 복제본이 동일한 데이터 항목의 복제본이 서로 다른 값으로 갈라질 (diverge) 수 있고, 이를 나중에 수리(repair)하는 방법을 고안하고 있다. 이 때, 복제본 버전들을 비교해서 어느 한쪽이 새로운 업데이트라서 다른 복제본들을 교체할 수 있는지, 아니면 동시적 (concurrent)이라서 한지를 semantic reconciliation을 필요로하는 지를 결정할 수단을 필요로 한다. 이러한 비교는 업데이트 사이의 인과적인 의존성 (causal dependency)에 따라 판단하는데, 인과적인 이력 (Causal History)을 모두 기록하는 것은 비효율적이므로, 이 정보를 요약하는 동시에 인과적인 의존성을 따질 수 있는 수단으로 람포트 시계(Lamport Clock)나 버전 벡터(Version Vector) 등의 개념들이 제안되고 실제로 위와 같은 시스템들에서 활용되고 있다.
이 논문에서 도입하고 있는 Dotted Version Vector는 인과적인 이력을 요약하기 위한 이러한 수단들이 가진 정확성(Correctness)의 한계를 개선하고, 동시에 확장성(Scalability)을 가지는 해결 방법으로서 제시되고 있다.
우선 이번 글에서는 논문의 전반부에 소개된, 인과성의 추적을 위해 사용되는 여러 방법들과 한계들에 대해서 알아보도록 하자.
1. 인과적인 이력 (Causal Histories)
우리가 다루고자 하는 클라이언트-서버로 구성된 스토리지 시스템에서, 클라이언트가 서버로 어떤 작업을 요청할 때마다 인과적인 관계가 발생한다. 이러한 관계를 formal하게 표현하기 위한 방법 중 하나가 인과적인 이력 (Mattern, 1994)이다. 인과적인 이력은 업데이트라는 사건들에 대한 유일한 식별자(identifier)의 집합으로 표현할 수 있다. 이 때 업데이트 사건의 식별자는 노드(리플리카 또는 클라이언트)의 식별자와 단조증가하는 카운터를 합쳐서 만들 수 있다. 업데이트가 발생할 때마다 새로운 식별자가 식별자의 집합에 추가하게 되고, 이 집합의 포함관계를 비교해서 인과성의 부분순서(partial order)를 추적할 수 있다. 두 집합이 서로를 포함하고 있지 않다면 동시적이라고 할 수 있다.
인과적인 이력은 개념적으로는 단순하지만, 업데이트에 따라서 식별자의 집합이 선형적으로 커지게 되므로, 실용적인 시스템에 사용하는 것은 적절하지 않다.
2. 인과적으로 호환되는 완전 순서 (Causally compliant total order)
업데이트들 사이의 인과적인 의존성과 호환되는 완전 순서를 정할 수 있다면, 이를 이용해 last writer wins 정책을 적용할 수 있다. 다른 방식들과 달리, 복제본 노드 (Replica Node)는 하나의 값만 유지하면 되고, 쓰기를 위해 읽기 문맥 (get context)에 해당하는 정보를 제공할 필요가 없기 때문에 매우 단순한 시스템을 얻을 수 있다는 장점이 있다. 문제는 이러한 완전 순서는 인과적인 의존성과 호환되지만, 동시적인 (concurrent) 업데이트들도 정렬해버리기 때문에, 어떤 동시적인 업데이트는 last writer wins 정책에 의해 잃어버리게 된다는 것이다.
2.1. 물리적인 시계
클라이언트들의 시계들이 잘 동기화된다면, 업데이트들을 물리적인 시계의 시간 순서에 따라 정렬함 (동시에 일어난 사건은 프로세스 ID를 이용해 정렬)으로써 완전 순서를 얻을 수 있다. 물리적인 시계에 기반한 완전 순서를 사용하는 방식은 Cassandra 0.6.x나 Dynamo에서 일부 애플리케이션에 대해 버전 벡터의 대안으로 사용되었다고 한다.
물리적인 시계에 기반할 경우의 문제점은 역시 클라이언트들의 시계들이 동기화에서 벗어날 때 발생한다. 시계가 느린 복제본 노드나 클라이언트는 항상 동시적인 업데이트 사이의 경쟁에서 지기 때문에, 항상 동시적인 업데이트를 잃어버리는 문제가 발생한다.
2.2. 램포트 시계 (Lamport Clock)
예전의 글에서 소개했듯이 램포트 시계는 역시 완전 순서를 위해 사용될 수 있는 논리적인 시계의 작동 방식을 제공하고 있다.
3. 서버별 항목을 가진 버전 벡터 (Version vectors with per-server entry)
서버별 항목을 가진 버전 벡터를 이용해 인과성을 추적하는 방법은 다음과 같이 작동한다.
클라이언트가 GET을 실행할 때 값에 반영된 사건들의 인과적인 이력를 나타내는 버전 벡터를 받음.
그 클라이언트가 PUT을 실행할 때, 이전의 GET에서 받았던 버전 벡터를 함께 보냄.
PUT을 실행하는 서버는 새로운 업데이트를 반영하기 위해 로컬 카운터를 증가시키고, 서버의 식별자에 해당하는 버전 벡터의 항목에 저장.
새로운 버전 벡터를 서버에 저장되어 있는 다른 버전 벡터와 비교하고, 낡은 버전들을 모두 버린다.
서로 다른 서버들 사이의 업데이트들 사이의 인과성을 추적하는 것이 가능하지만, 동일한 서버에서 발생한 업데이트들 사이의 인과성을 추적할 수 없다. 즉, 동일한 서버에서 발생한 동시적인 업데이트는 또 다시 last writer wins 정책이 적용되므로, 적어도 하나의 동시적인 업데이트는 잃어버릴 수 밖에 없다. Plausible Clocks에서 설명하듯이, 이러한 문제의 본질적인 원인은, 동시적인 업데이트를 발생시키는 근원에 해당하는 클라이언트의 수에 비해서 적은 수의 버전 벡터 항목을 사용하기 때문이다.
이러한 방식은 Dynamo가 사용하고 있다.
4. 클라이언트별 항목을 가진 버전 벡터 (Version vectors with per-client entry)
서버별 항목을 가진 버전 벡터에서 살펴본 문제를 해결하기 위한 가장 자연스러운 접근 중 하나는 클라이언트별 항목을 가진 버전 벡터를 사용하는 것이다. 동작 방식은 다음과 같다.
클라이언트가 GET을 실행할 때 값에 반영된 사건들의 인과적인 이력을 나타내는 버전 벡터를 받음.
그 클라이언트가 PUT을 실행할 때, 이전의 GET에서 받았던 버전 벡터, 그리고 클라이언트의 식별자와 클라이언트별로 단조증가하는 카운터를 함께 보냄.
이 방식은 서로 다른 클라이언트들에 의해 발생한 동시적인 업데이트들 사이의 인과성을 완전히 추적할 수 있지만, 클라이언트 당 하나의 항목을 필요로 하기 때문에, 버전 벡터들의 크기가 클라이언트들의 수에 비례하게 되고, 이 방식을 실용적으로 사용할 수 없게 만드는 원인이 된다.
Kafka: A distributed messaging system for log processing In Proceedings of 6th International Workshop on Networking Meets Databases (NetDB), Athens, Greece (2011) by J. Kreps, N. Narkhede, J. Rao
Kafka는 기존의 메시징 시스템에서 당연하다고 가정하고 있었지만, 로그 처리 시스템에서는 필요없는 보장들을 과감하게 버리고, 성능 위주의 설계를 함으로써, 실제로 링크 속도에 육박하는 성능을 보여주고 있고, 상당히 단순한 아키텍쳐를 유지하고 있는 점이 흥미로운 점입니다. 그리고, 현실의 데이터 처리에 있어서는 버그나 장애로 인해 흔히 발생하는 재처리가 pull 기반의 소비 모델을 도입함으로써 매우 쉽게 가능해졌다는 점도 눈여겨 볼 부분입니다.
주의: 이 글은 2011에 출판된 Kafka의 페이퍼 내용을 다루고 있으며, 현재의 Kafka 버전의 내용을 다루고 있지 않습니다.
Problem
기존의 엔터프라이즈 메시징 시스템들의 한계를 다음과 같은 이유들로 설명하고 있습니다.
배달 보장 (delivery guarantee)을 위한 기능들은 로그 처리를 위한 시스템 입장에서는 불필요..
처리속도 (throughput)를 디자인 제약으로 고려하지 않음.
분산에 대한 고려가 부족.
메시지가 즉시 소비되는 것을 전제로 하기 때문에 메시지가 쌓일 경우 성능이 하락.
마찬가지로 최근에 만들어진 로그 처리 시스템들 – Scribe, Yahoo’s data highway project, Flume의 한계는 다음과 같은 이유들로 설명하고 있습니다.
로그 데이터를 오프라인으로 처리하는 것을 위해 만들어진 시스템.
대부분은 push 모델.
Kafka Architecture and Design Principles
Kafka의 주요 개념들과 개략적인 디자인은 다음과 같습니다.
토픽 (topic): 특정한 타입의 메시지의 스트림
프로듀서 (producer): 어떤 토픽에 대해 메시지를 발행 (publish)할 수 있음.
프로듀서에 의해 발행된 메시지들은 브로커 (broker)라는 서버들에 저장됨. Kafka 클러스터는 일반적으로 여러 브로커들로 이루어짐.
컨수머 (consumer)는 브로커로부터 1개 이상의 토픽을 구독할 수 있고, 브로커들로부터 데이터를 당김(pull)으로써 구독한 메시지들을 소비 (consume)할 수 있음.
point-to-point 배달 모델: 여러 컨수머가 하나의 토픽 내의 메시지를 하나씩 소비.
발행/구독 (publish/subscribe) 모델: 여러 컨수머가 하나의 토픽의 각자의 복제본 메시지들을 소비.
토픽은 여러 파티션들로 이루어져 있고, 각각의 브로커는 하나 이상의 파티션을 저장.
Efficiency on a Single Partition
Simple storage
어떤 토픽의 각 파티션은 논리적인 로그에 해당.
물리적으로 로그는 거의 동일한 크기의 세그먼트 파일들의 집합으로 구현됨.
프로듀서가 어떤 파티션에 메시지를 발행할 때마다 브로커는 마지막 세그먼트 파일에 메시지를 append 한다.
성능을 위해서 특정 개수의 메시지가 발행되거나 특정 시간이 흐른 후에 세그먼트 파일을 디스크로 flush한다.
메시지는 flush가 된 후에 컨수머에게 노출됨.
전형적인 메시징 시스템과 달리, 카프카의 메시지에는 메시지 식별자가 없고 로그 상의 논리적인 오프셋 (offset)을 주소로 사용.
메시지 식별자로부터 메시지를 찾기 위한 인덱스 구조를 유지할 필요가 없어짐.
Kafka의 메시지 식별자 – 로그 상의 논리적인 오프셋은 자연히 증가하지만, 연속적이지는 않음.
컨수머는 항상 특정한 파티션으로부터 연속적으로 메시지를 소비함.
컨수머가 특정한 메시지 오프셋에 대해 ack을 한다면, 이는 그 파티션의 해당 오프셋 이전의 모든 메시지를 받았음을 의미.
컨수머는 브로커에게 비동기적인 당김 (pull) 요청을 보낸다. 각 요청은 소비할 메시지의 오프셋과 최대로 가져올 메시지의 크기를 포함.
브로커는 모든 세그먼트 파일의 첫번째 메시지의 오프셋들의 목록을 메모리 상에 유지.
브로커는 그 목록을 세그먼트 파일을 찾고, 컨수머에게 데이터를 보낸다.
컨수머가 메시지를 받은 후에는 다음 메시지의 오프셋을 계산해서 다음 요청에서 사용.
Efficient transfer
프로듀서는 한번의 send 요청에 여러 메시지를 보낼 수 있음.
컨수머 API는 한번에 하나의 메시지를 소비하는 것처럼 보이지만, 내부적으로는 각 당김 요청은 특정 사이즈 (보통 수백 KB) 내에 해당하는 여러 메시지를 가지고 옵니다.
메시지들을 직접 메모리 상에 캐싱을 하지 않고 파일 시스템 페이지 캐시에 의존함.
중복된 버퍼링을 피함.
프로세스 내에서 메시지를 캐시하지 않기 때문에 가비지 컬렉션의 오버헤드가 매우 적어짐.
프로듀서와 컨수머는 세그먼트 파일을 순차적으로 액세스하고, 보통 컨수머는 프로듀서보다 살짝 뒤쳐지기 때문에, 보통의 OS 캐싱 휴리스틱 (write through, read-ahead)이 매우 효과적으로 동작함.
sendfile API를 이용해서 복사와 시스템 콜 회수를 줄임.
Stateless broker
대부분의 다른 메시징 시스템과 다르게, 각각의 컨수머가 얼마나 소비했는지는 브로커가 유지하지 않음.
이러한 디자인은 브로커의 복잡도와 오버헤드를 줄임.
문제는 이미 소비한 메시지를 언제 지울지 결정하는 것이 어려워짐.
Kafka는 시간 기준의 SLA를 사용해서 해결하는데, 특정 기간 (보통 1주일) 이상이 지나면 메시지는 자동적으로 삭제됨.
대부분의 컨수머는 비교적 짧은 시간 단위 – 매일, 시간별, 실시간 – 내에 소비를 마치기 때문에 문제가 없음.
카프카는 데이터가 크기에 따라서 성능이 저하되지 않으므로 이러한 방법이 가능.
부가적인 이점으로, 컨수머는 이전의 오프셋으로 되돌아가 메시지를 다시 소비하는 것이 가능해짐.
예를 들어, 컨수머의 로직에 에러가 있을 경우, 에러가 수정된 후에 문제가 된 메시지들을 다시 처리하는 것이 가능.
컨수머가 크래시한다면 flush되지 않은 데이터는 잃어버리겠지만, flush하지 않은 가장 작은 오프셋부터 메시지를 다시 소비하는 것이 가능.
이러한 메커니즘은 push 모델보다 pull 모델에서 훨씬 쉬움.
3.2 Distributed Coordination
각각의 프로듀서는 랜덤하게 선택된 파티션이나 파티셔닝 키와 파티셔닝 함수에 의해 결정된 파티션으로 메시지를 발행할 수 있다.
컨수머 그룹 (consumer group)이라는 개념
토픽의 집합을 consume하는 하나 이상의 컨수머들로 구성.
서로 다른 컨수머 그룹은 독립적으로 구독한 메시지들의 전체 집합을 consume하고, 컨수머 그룹 끼리는 조정?(coordination)이 불필요.
어떤 토픽 내에서 하나의 파티션은 병행성의 최소 단위
주어진 시점에 하나의 파티션으로부터의 모든 메시지는 각 컨수머 그룹 내에서 하나의 컨수머에 의해서만 소비됨.
하나의 파티션을 여러 컨수머가 동시에 소비하도록 했다면 누가 어떤 메시지를 소비할지는 조정해야하는 오버헤드가 발생.
로드를 균형 있게 맞추기 위해서는 파티션의 수는 컨수머의 수보다 더 많아지도록 유지.
시스템을 단순화하기 위해 중앙의 마스터 노드를 가지지 않도록 함.
조정을 위해서 Zookeeper를 사용.
Kafka가 ZooKeeper를 사용하는 작업들
브로커와 컨수머의 추가와 제거를 탐지
브로커나 컨수머가 추가/제거될 경우 각 컨수머에게 리밸런스 프로세스를 트리거.
소비 관계를 유지하고, 각 파티션의 소비된 오프셋을 추적
브로커나 컨수머가 시작하면 주키퍼의 브로커 또는 컨수머 레지스트리에 그 정보를 저장.
브로커 레지스트리는 브로커의 호스트 이름과 포트, 토픽과 파티션의 집합을 포함.
컨수머 레지스트리는 컨수머가 속한 컨수머 그룹과, 구독하고 있는 토픽들의 집합을 포함.
각 컨수머 그룹은 소유권 레지스트리와 오프셋 레지스트리와 연관됨.
소유권 레지스트리는 모든 구독된 파티션에 대해 하나의 ZooKeeper 경로를 가지며, 그 값은 현재 그 파티션을 소비하고 있는 컨수머의 ID.
오프셋 레지스트리는 각각의 구독된 파티션에 대해 파티션 내에서 마지막으로 소비된 오프셋을 저장.
컨수머의 실패는 컨수머 레지스트리에서 컨수머에 해당하는 항목과 소유권 레지스트리에서 그것이 가진 모든 파티션을 삭제.
각 컨수머는 브로커와 컨수머 레지스트리에 대해 ZooKeeper Watcher를 등록하고 브로커나 컨수머 그룹에서 변화가 일어나면 통지를 받음.
컨수머의 시작이나 컨수머가 브로커/컨수머 변화에 대해 통지를 받으면, 컨수머는 소비할 파티션의 부분집합을 정하기 위해 리밸런스 프로세스를 시작.
브로커와 컨수머 레지스트리를 읽어서 각 구독된 토픽에 대해 사용가용한 파티션의 집합과 그 토픽을 구독하고 있는 컨수머의 집합을 계산.
파티션 집합을 범위로 파티셔닝(range-partition)해서 컨수머의 수 만큼의 청크로 나누고 소유할 청크를 고름.
컨수머가 고른 각 파티션에 대해 소유권 레지스트리에 그 파티션의 새로운 소유자로 스스로를 써넣음.
컨수머는 각 소유한 파티션으로부터 데이터를 당겨오는 쓰레드를 시작. 오프셋 레지스트리에 저장된 오프셋으로부터 읽기 시작.
컨수머는 주기적으로 오프셋 레지스트리에 마지막으로 소비한 오프셋을 업데이트.
그룹 내에 여러 컨수머가 있을 때 각각은 브로커/컨수머 변경에 대해 통지를 받지만, 그 통지가 컨수머마다 살짝 다르게 올 수 있음.
하나의 컨수머가 다른 컨수머에 의해 아직 소유된 파티션에 대한 소유권을 가지려고 하는 것이 가능.
이 경우, 첫번째 컨수머는 단순히 모든 파티션을 릴리즈하고 조금 기다린 후 리밸런스 프로세스를 재시도한다.
리밸런스 프로세스는 단 몇번의 재시도 만에 안정화됨.
새로운 컨수머 그룹이 만들어졌을 때는 오프셋 레지스트리에는 오프셋이 없지만, API를 사용해서 최소 또는 최대의 오프셋부터 시작할 수 있음.
Delivery Guarantees
Kafka는 at-least-once 배달만을 보장함.
exactly-once 배달은 보통 2PC를 필요로 하고, 카프카가 대상으로 하는 애플리케이션은 필요로 하지 않음.
보통의 경우, 메시지는 각 컨수머 그룹에 정확히 한번만 배달됨. 하지만, 컨수머 프로세스가 정상적으로 종료되지 않고 죽은 경우, 그 컨수머가 소유하고 있던 파티션을 새롭게 소유하게된 컨수머는 ZooKeeper에 커밋된 마지막 오프셋 후의 메시지들에 대해 중복 메시지를 소비하게 됨.
애플리케이션에 있어서 메시지의 중복이 중요한 문제라면, Kafka가 컨수머에게 돌려주는 오프셋을 이용하거나 메시지 내의 어떤 유일하게 식별 가능한 키를 사용해서 자체적인 중복 방지 로직을 추가해야함.
Kafka는 하나의 파티션으로부터의 메시지는 컨수머로 차레대로 전달되는 것을 보장. 하지만 서로 다른 파티션으로부터 오는 메시지의 순서는 보장하지 않음.
로그가 망가지는 것을 피하기 위해 각 메시지에 대해 CRC를 저장.
Kafka는 CC를 가진 메시지를 삭제하는 리커버리 프로세스를 실행.
이 CRC는 네트워크 에러를 체크할 수도 있게 해줌.
브로커가 죽으면 그것 내에 저장된 소비되지 않은 메시지는 사용불가능해짐.
미래에는 여러 브로커에 메시지를 중첩해서 저장하는 자체적인 복제를 추가할 예정.
Experimental Results
Producer test
Kafka의 Producer 성능이 좋은 이유를 아래와 같은 이유로 설명하고 있습니다.
Kafka 프로듀서는 브로커로부터의 ack을 기다리지 않고 브로커가 다룰 수 있는 만큼 빠르게 메시지를 보냄.
배치 크기가 50일 때는 Kafka 프로듀서는 프로듀서-브로커 사이의 1GB 링크에 포화됨. (saturated)
ack이 없으므로 발행된 모든 메시지가 실제로 브로커에 의해 받아진 건지 보장할 수 없음.
로그 데이터에 대해서는 영속성을 처리속도와 트레이드를 하는 것이 좋음.
미래에 영속성이 중요한 데이터에 대한 개선을 계획 중임.
효율적인 스토리지 형식
평균적으로 Kafka에서는 각 메시지는 9바이트의 오버헤드를 가짐. 반면, ActiveMQ에서는 144 바이트.
ActiveMQ에서의 오버헤드는 JMS가 요구하는 무거운 메시지 헤더에서 옴.
여러 인덱싱 구조를 유지하기 위한 비용도 존재.
배치 방식은 RPC의 오버헤드를 감추어서 (amortize) 처리속도를 크게 개선함.
Consumer Test
Kafka가 좋은 성능을 보여주는 이유를 다음과 같이 설명하고 있습니다.
효율적인 스토리지 포맷
ActiveMQ, RabbitMQ는 모든 메시지의 배달 상태를 유지하기 위해 꾸준히 쓰기가 발생하지만, Kafka의 브로커는 소비를 위한 디스크 쓰기가 발생하지 않음.
sendfile API의 사용.
Future Works
Kafka가 미래에 지원할 기능으로 우선 영속성을 위한 자체적인 복제 지원을 들고 있습니다. 비동기-동기 복제를 둘다 지원하려고 하고, 애플리케이션의 여러가지 요구에 따라 적절한 중첩 레벨을 선택할 수 있도록 하려 한다고 합니다. 다른 하나는 스트림 프로세싱입니다. 기본적인 레코드 수를 센다든지, 다른 스트림이나 다른 스토리지의 데이터와 join하는 등의 처리가 가능하게 하려고 합니다. 우리는 현재의 Kafka는 스트림 프로세싱을 지원하지 않고, Kafka와 협동하는 다른 프레임워크에 위임하는 방식으로 이루어지고 있다는 것을 알고 있습니다.
최근 몇개월 동안 상대적으로 크기가 작고 많은 파일들을 비교적 단순한 솔루션을 이용해서 효율적으로 저장하는 것에 대해서 관심이 있어서 읽어본 페이퍼이다. 파일들의 개수로 인한 메모리 사용량과 복제 시간 등이 골칫거리라서 커다란 파일에 저장하는 방법을 여러가지로 검토하고 있었는데, 매우 실용적이고 깔끔한 접근으로 문제를 해결하고 있고, 실제로 개발 비용도 적게 (몇개월 정도 걸렸다고 한다) 들었으므로, 2009년 경의 시스템이지만, 서비스를 다루는 엔지니어로서는 배울 점이 있다고 생각한다. 이미지 파일 자체가 불변 데이터이므로 상대적으로 쉬운 문제인 것은 맞지만, 실제로 서비스에 적용하는 것은 또 다른 문제이다. 오픈소스화 되어 있지 않은 것이 아쉬운 점이라고 할 수 있는데 – 실리콘 밸리의 큰 회사들 치고는 의외로 Facebook은 오픈소스에 그렇게 열려있지 않은 문화인 것 같다 – 구현이 비교적 단순하기 때문인지 수많은 Haystack Clone들이 있다. 혹시 좋은 구현을 찾게되면 소개할 기회가 있을지도 모르겠다.
Problem – NFS Approach
Facebook은 NAS와 NFS를 이용하는 사진 스토리지를 구축하고 있었으며 다음과 같은 문제를 인식하고 있었다.
사진을 저장할 때 전통적인 POSIX 파일시스템에서 요구하는 파일 메타데이터는 필요하지 않음.
사진 파일을 읽기 위해서 3번의 디스크 오퍼레이션이 필요하고 메타데이터의 액세스가 병목이 됨.
디렉토리의 blockmap이 너무 커서 효과적으로 캐시되지 않음.
Hot한 액세스는 CDN으로 다룰 수 있지만, long tail 성격을 가진 액세스가 있으므로, 캐싱만으로 해결할 수 없음.
대체 기술들도 적합하지 않음.
MySQL, Hadoop
RAM을 늘리는 것만으로는 비효율적.
Design & Implementation
Haystack은 다음과 같은 목표를 달성해서 NFS approach의 bottleneck을 줄인다.
실제 데이터를 읽기 위한 디스크 오퍼레이션을 단 1번만 필요하도록 한다.
메타데이터를 위한 메모리 사용량을 줄임.
기본적인 접근 방식은 여러 사진을 하나의 커다란 파일에 저장하는 방식을 사용하는 것이다.
전체적인 시스템 구성은 Haystack Store, Haystack Directory, Haystack Cache의 3개의 서브시스템으로 이루어져 있다.
Store
Persistent storage로 파일시스템 메타데이터를 관리하는 컴포넌트.
Logical volume은 서로 다른 장비의 Physical volume들로 구성됨.
Directory
Logical Volume으로부터 Physical volume으로의 맵핑.
각 사진들이 있는 Logical volume이나 여유공간이 있는 Logical volume을 관리한다.
Cache
내부적인 CDN과 같은 역할.
웹서버는 디렉토리를 이용해서 URL을 구성하는데, 특이한 것은 사진의 URL에 Logical volume이나 장비에 대한 힌트 등 사진을 가져오기 위한 정보가 들어가 있다는 점이다.
URL은 CDN host/Cache/Machine ID/Logical volume, Photo와 같은 형태인데, CDN 또는 Haystack Cache는 (Logical volume, Photo) 정보만을 이용해서 자신의 캐시로부터 사진을 찾는다.
CDN이나 Cache로부터 miss일 경우에는 Cache address를 URL로부터 제거하고 Store 장비에 사진을 요청한다.
Haystack Directory
Haystack Directory는 다음과 같은 4가지 기능을 가지고 있다.
Logical volume을 Physical volume으로 맵핑
어떤 Store 장비가 소실된 상황에서는 mapping에 상응하는 엔트리를 삭제하고 온라인이 된 새로운 머신으로 대체.
쓰기 액세스를 여러 로지컬 볼륨에 대해 로드 밸런싱, 읽기 액세스를 여러 피지컬 볼륨에 대해 로드밸런싱
리퀘스트가 CDN으로 핸들링되어야할지 Cache로 핸들링되어야할지 결정
로지컬 볼륨이 읽기전용인지 아닌지에 대한 판단. (운영적인 이유나 용량 한계)
새로운 장비를 추가하면 이 장비는 쓰기가 허용되고, 쓰기가 허용된 장비만 사진의 업로드를 받는다.
Haystack Directory는 복제가 되는 데이터베이스에 정보들을 저장하고 PHP interface로 액세스되며, latency를 줄이기 위해 memcache를 사용한다고 한다.
Haystack Cache
Haystack Cache는 Photo ID를 키로 데이터를 찾을 수 있는 Distributed hash table로 구성되어 있다. 어떤 DHT를 사용하고 있는지에 대해서는 자세히 설명되어 있지 않다. 캐시되어있지 않다면, URL에 명시되어 있는 Store 장비에서 사진을 가져온다. 캐시하는 조건이 조금 특이한데 다음과 같다.
사용자로부터 직접 온 리퀘스트이고 CDN이 아닐 때: CDN에서 miss된 것이 내부 캐시에서 hit될 가능성이 낮음.
쓰기가 허용된 Store 장비로부터 사진을 가져왔을 때: 사진은 업로드된 뒤에 헤비하게 액세스되므로 쓰기가 허용된 장비를 과도한 읽기 액세스로부터 보호.
Haystack Store
전반적인 구조
각 Store 장비는 여러 개의 Physical volume을 관리하고, 각 volume은 수백만 개의 사진을 저장한다.
이를테면, Physical volume은 100GB이고, /hay/haystack…<logical volume id>와 같이 저장됨.
Store 장비는 Logical volume의 ID와 file offset을 이용해서 사진을 액세스할 수 있다.
Store 장비는 각 Physical volume에 대해 file descriptor를 유지한다.
Physical volume은 수퍼블럭과 needle의 sequence로 이루어져 있음.
needle을 빠르게 가져오기 위해서 각 볼륨에 대해 In-memory data structure를 유지하고 있음.
(key, alternative key) -> (needle’s flags, size in bytes, volume offset)
alternative key는 4개의 서로 다른 크기의 사진을 나타냄.
읽기 (Photo Read)
읽기 요청에는 Logical volume ID, Key, Alternative key, Cookie가 있다.
사진이 업로드될 때 cookie는 랜덤하게 생성되어 Directory에 저장된다. 사진의 URL을 추측해서 액세스하는 공격을 막아준다.
In-memory 맵핑에서 관련된 메타데이터를 찾고, 지워진 사진이 아니라면 적절한 offset을 seek해서 전체 needle을 읽어들임.
읽어들인 needle로부터 cookie를 검증하고 데이터의 integrity를 체크 후 문제가 없다면 응답.
쓰기 (Photo Write)
쓰기 요청에는 Logical volume id, Key, Alternate key, Cookie, Data가 있다.
Fidge, C.J., Timestamps in Message-Passing Systems that Preserve the Partial Ordering, Proc. 11th Australian Comp. Sci. Conf., 1988, pp. 56-66.
Timestamping is a common method of totally ordering events in concurrent programs. However, for applications requiring access to the global state, a total ordering is inappropriate. This paper presents algorithms for timestamping events in both synchronous and asynchronous message-passing programs that allow for access to the partial ordering inherent in a parallel system. The algorithms do not change the communication graph or require a central timestamp issuing authority.
이 논문은 Vector Clock으로 알려진 개념을 소개하고 있다. Vector Clock은 [Fidge 1998]과 [Mattern 1998]에서 독립적으로 고안된 알고리즘인데, Fidge의 논문쪽이 좀 더 읽기 쉬운 편이라고 한다.
Limitation of Lamport’s logical clock
Clock Condition. For any events a, b: if a → b then C(a) < C(b).
Lamport가 정의한 논리적 시계는 두 사건 사이의 happened-before 관계가 있다면 이를 반영하는 시계를 정의하고 있습니다. 그리고 이를 이용해 임의의 완전순서를 정의하고 있습니다. 이 완전순서는 임의의 프로세스 사이의 순서를 이용하고 있으므로 사건들을 정렬할 수 있는 방법 중의 임의적인 한 방법이고, 논리적 시계 이상의 어떤 정보가 추가되지는 않습니다.
Lamport가 정의한 논리적 시계의 문제는 이 논리적 시계를 통해서 사건들 사이의 인과성 (causality)를 알아낼 수 없다는 것입니다. 즉, C(a) < C(b)라고 해서 a → b라고 말할 수 없는 것입니다. 구체적으로 말해, C(a) < C(b)가 의미할 수 있는 것은 a와 b 사이에 happend-before 관계가 있는 것, 즉 a → b인 경우와, a와 b 사이에 happend-before 관계를 판단할 수 없고 b가 a에 대해 happened-before가 아닌 것, 즉 b ↛ a인 경우가 있습니다.
Vector Clock
Vector Clock이라는 단어를 논문에서 사용하고 있지는 않지만, 그 단어가 의미하듯이 논리적 시계의 타임스탬프를 integer가 아니라 다음과 같이 프로세스 수 만큼의 타임스탬프를 가진 배열로 대체하고 있습니다.
[c1, c2 … cn]
그리고 이 타임스탬프를 유지하기 위한 알고리즘은 아래와 같습니다. 타임스탬프가 배열이 되었다는 것 이외에 Lamport의 알고리즘과 큰 차이가 없다고도 얘기할 수 있겠지만, 주의를 기울여야 하는 부분은 메시지를 수신하는 프로세스에서는 메시지를 송신한 프로세스에 해당하는 타임스탬프 배열 항목의 값만을 증가시킨다는 것입니다.
Rule RA 1: Initially all values are zero.
Rule RA 2: The local clock value is incremented at least once before each atomic event
Rule RA 3: The current value of entire timestamp array is piggybacked on every outgoing signal
Rule RA 4: Upon receiving a signal, a process sets the value of each entry in the timestamp array to be the maximum of the two corresponding values in the local array, and in the piggybacked array received. The value corresponding to the sender, however, is a special case and is set to be one greater than the value received (to allow for transit time), but only if the local value is not already greater than that received (to allow for signal “overtaking” as described below), i.e.
q?other_array; /* receive timestamp array from process q */
if local_array[q] <= other_array[q] then
local_array[q] := 1 + other_array[q];
for i := 1 to n do
local_array[i] := max(local_array[i], other_array[i]);
Rule RA 5: Values in the timestamp arrays are never decremented.
이 논문에서는 설명하고 있지 않지만, 이러한 알고리즘으로 얻어지는 타임스탬프의 의미는 수신한 메시지에 근거해서 추측할 수 있는 가장 정확한 송신 측의 타임스탬프라는 것입니다. 또한, 수신한 프로세스에서 발생할 현재의 사건에 대해 happened-before 관계에 있는 송신 측의 프로세스의 가장 마지막 사건의 타임스탬프이기도 합니다.
이러한 타임스탬프 배열은 다음과 같이 비교될 수 있습니다.
각각 프로세스 p, q에서 실행된 사건 e, f를 ep, fq로 나타내고, 각 사건에 대한 타임스탬프 array를 Tep, Tfq, 그 타임스탬프 배열 중 프로세스 p에 해당하는 타임스탬프를 Tep[p], Tfq[p] 라고 할 때,
ep → fqiff Tep[p] < Tfq[p]
여기서 주목할 점은 iff 즉, 사건의 happened-before 관계와 타임스탬프의 대소 관계가 동치관계라는 것입니다. 임의의 두 사건이 주어졌을 때 인과성이 있는지 여부와 어떤 사건이 어떤 사건에 대해 인과성이 있는지를 판단할 수 있습니다.
ep ↔ fq if Tep[p] ≮ Tfq[p] and Tfq[p] ≮ Tep[p]
타임스탬프 사이의 대소 관계가 어느 방향으로도 성립하지 않는다면 두 사건은 동시적(concurrent)하다고 얘기할 수 있습니다.
Applications
그렇다면 논리적 시계를 통해서 인과성을 알아내는 것은 왜 필요한가에 대해서 이 논문이 예로 들고 있는 것 중 하나는 여러 프로세스들에 의해서 저장되는 상태의 스냅샷들에 타임스탬프를 이용해서, 전체 프로그램 상태에 대해 정상적인 상태인지 검사를 할 수 있고 이를 통해 역실행 (reverse execution), 에러의 복구, 롤백 등이 가능하다는 것입니다. 실제로 이후에 발표된 Dynamo나 Riak 등 Vector Clock을 이용하는 것으로 알려진 시스템에서는 동시적인 쓰기 등에 의해 어떤 데이터에 대해 2개 이상의 복제본이 발생했을 때 이를 해소하기 위해서 인과성을 활용하고 있는 사례를 볼 수 있습니다.
References
[Fidge 1998] Fidge, C.J., Timestamps in Message-Passing Systems that Preserve the Partial Ordering, Proc. 11th Australian Comp. Sci. Conf., 1988, pp. 56-66.
[Mattern 1998] Mattern, F. Virtual time and global states of distributed systems. Proc. “Parallel and distributed algorithms” Conf., (Cosnard, Quinton, Raynal, Robert Eds), North-Holland, 1988, pp. 215-226.
The concept of one event happening before another in a distributed system is examined, and is shown to define a partial ordering of the events. A distributed algorithm is given for synchronizing a system of logical clocks which can be used to totally order the events. The use of the total ordering is illustrated with a method for solving synchronization problems. The algorithm is then specialized for synchronizing physical clocks, and a bound is derived on how far out of synchrony the clocks can become.
Leslie Lamport는 분산 컴퓨팅 분야에서는 너무나 유명한 분이기 때문에 따로 설명할 필요가 없을 정도입니다. 예를 들어, 1978년에 출판된 “Time, Clocks, and the Ordering of Events in a Distributed System”과 같은 페이퍼는 인용 회수로 볼 수 있는 그 영향력 뿐만 아니라 OS 수업의 읽기 과제로도 빠질 수 없는 그야말로 seminal work입니다.
1. “Happened before” 관계
가장 먼저, “Happened before” 관계라는 분산시스템 내에서 사건(event)들의 순서를 표현할 수 있는 부분순서(partial ordering)를 제안하고 있습니다.
“Happened before” relation은 분산시스템에서 구현이 어려운 물리적 시계 (physical clock)를 사용하지 않더라도 사건들의 순서를 표현할 수 있음을 보여주고 있습니다. 즉, 사건이 발생한 전역적으로 통용되는 시각을 알지 못해도 우리는 자연스럽게 사건의 순서를 정할 수 있습니다.
또한, 서로 다른 프로세스 사이에서 메시지를 주고 받을 경우에 사건 사이의 인과성(causality)이 발생하는 것을 다음과 같은 정의에 담고 있습니다.
Definition. The relation → on the set of events of a system is the smallest relation satisfying the following three conditions:
If a and b are events in the same process, and a comes before b, then a → b.
If a is the sending of a message by one process and b is the receipt of the same message by another process, then a → b.
If a → b and b → c then a → c.
Two distinct events a and b are concurrent if a ↛ b and b ↛ a. Assume a ↛ a for any event a. So → is an irreflexive partial ordering on the set of all events in the system.
2. Logical Clocks
논리적 시계는 하나의 프로세스 내의 각 사건들에 대해 단조 증가하는 숫자를 할당하는 것으로 볼 수 있으며, 물리적 시계가 아니라 사건이 발생한 순서에 기초해야 합니다.
Clock Condition. For any events a, b: if a → b then C(a) < C(b).
“Happened before” 관계의 정의에 따라, 각 프로세스는 사건이 발생할 때마다 증가하는 타임스탬프를 부여하며, 프로세스 사이에 메시지를 송신할 때 타임스탬프를 함께 송신하며, 메시지를 수신하는 측에서는 수신한 타임스탬프보다 더 큰 타임스탬프를 새로운 사건에 할당합니다.
3. Ordering the Events Totally
위에서 정의된 논리적 시계만으로는 분산시스템 내의 사건들의 완전순서(total ordering)는 불가능하기 때문에, 프로세스 사이의 임의의 순서(!)를 도입해, 완전순서를 만들 수 있습니다.
In case two or more events occur at the same time, an arbitrary total ordering ≺ of processes is used. To do this, the relation ⇒ is defined as follows:
If a is an event in process Pi and b is an event in process Pj , then a ⇒ b if and only if either: i. Ci<a> < Cj<b> or ii. Ci<a> = Cj<b> and Pi ≺ Pj
이렇게 정의된 완전순서를 이용해 분산시스템에서의 상호배제(mutual exclusion) 문제를 해결하는 방법을 제시하고 있습니다.
간단히 얘기하면, 어떤 프로세스가 자원을 요청할 때는 자신의 타임스탬프를 함께 다른 모든(!) 프로세스들에게 전송하고, 이 자원의 요청은 요청한 프로세스 뿐만 아니라 모든 프로세스의 큐에 위에서 정의된 완전순서대로 정렬됩니다. 이 큐에서 가장 앞에 있는 자원 요청이 자원을 획득해야한다고 볼 수 있는데, 모든 프로세스들이 동일한 큐를 유지해야하기 때문에, 자원 획득을 위해서는 모든 다른 프로세스들로부터 receipt 메시지를 수신한 조건에서만 자원 획득이 가능합니다. 또한, 다른 프로세스의 자원 요청에 대한 응답과 자신의 자원 요청의 (타임스탬프의 순서는 올바르다고 하더라도) 전송 순서가 바뀌는 경우는 두 프로세스 사이의 통신은 순서대로 일어난다는 가정에 의해 발생하지 않습니다.
이 구현은 1개 이상의 프로세스의 실패나 메시지의 소실 등을 가정하고 있지 않기 때문에 현실적으로 사용할 수 있는 알고리즘은 아니지만, 분산시스템에서의 완전순서의 유용성을 보여주고 있다고 생각합니다. 그리고, Lamport가 이후에 제안한 Paxos가 바로 현실적인 환경 하에서의 완전순서를 보장하기 위한 방법을 제공하고 있는 것으로 보입니다.
4. Anomalous Behavior
위에서 제시한 상호배제 알고리즘에서 사건 사이의 인과성이 외부화됨으로써 발생할 수 있는 문제를 제시하고 있습니다. 예를 들어, 자원 요청 A를 한 후, 전화를 걸어 다른 컴퓨터에서 자원 요청 B를 한 경우, 자원 요청 A와 자원 요청 B 사이에는 실제로 인과성이 존재하지만, 논리적 시계의 체계는 이를 인지하지 못하므로, 자원 요청 B가 더 낮은 타임스탬프를 획득해 먼저 자원을 획득할 수 있다는 것입니다.
이를 해결하기 위한 방법으로 2가지 방법을 제시하고 있는데, 첫번째는 “happened-before” 관계에 필요하지만 외부화된 정보를 시스템 내로 도입하는 것입니다. 즉, 자원 요청 B를 하는 사용자에게 자원 요청 A 보다 더 이후의 타임스탬프를 발급하도록 자원요청 A의 타임스탬프를 전화를 통해 알려주는 방법 등으로 논리적 시계 체계를 유지하는 책임을 부여하는 것입니다.
다른 방법은 외부화된 사건들 사이의 관계를 포함해 모든 사건들을 정렬할 수 있는 물리적인 시계의 체계를 구성하는 것입니다. 이 시계들은 다음의 Strong Clock Condition을 만족해야 합니다.
Let S be the set of all system events and S be the set containing S along with relevant events external to the system. Let ↪ denote the “happened before” relation for S.
For any events a, b in S: if a ↪ b then C<a> < C<b>.
5. Physical Clocks
Strong Clock Condition을 만족하기 위해서는 물리적 시계는 다음과 같은 조건을 만족해야합니다.
Let Ci(t) denote the reading of clock Ci at physical time t. Assume that Ci(t) is a continuous, differentiable function of t except for isolated discontinuities introduced by clock resets. PC1. There exists a constant k << 1 such that for all i: |dCi(t) / dt – 1| < k. PC2. For all i, j : |Ci(t) – Cj(t)| < e.
PC1은 물리적 시계가 가는 속도가 물리적 시간이 흐르는 속도와 일정 오차 범위 내에 있다는 것을 의미합니다. 일반적인 수정 발진 방식의 시계의 경우, k는 약 10-6정도로 언급하고 있으며, PC1 자체는 만족되는 것으로 가정하고 있습니다.
한편, PC2는 물리적 시계들 사이의 오차가 일정 범위 내에 있다는 것을 의미합니다. PC2를 보장하기 위해서 물리적 시계의 보정을 위한 구현 방법을 제시하고 있습니다.
Let vm = t’ – t be the total delay of m which is unknown to the receiving process. Let µm be some minimum delay >= 0 known by the receiving process such that µm <= vm.
IR2′. a. If Pi sends a message m at physical time t, then m contains a timestamp Tm = Ci(t). b. Upon receiving a message m at time t’, process Pj sets Cj(t’) equal to max(Cj(t’ – 0), Tm + µm).
메시지에 타임스탬프를 넣어서 보내고, 이를 수신하는 측에서는 메시지를 통해 받은 타임스탬프와 최소 통신 지연 시간 이후의 시각으로 시계를 재설정하는 방식이라고 할 수 있고, 이러한 구현 요구사항을 만족한다면, PC2를 만족하는 것을 증명할 수 있다고 합니다. 이 증명에서는 일정 시간 내에 메시지가 전체 프로세스 사이로 전송되는 상황을 가정하고 있으므로, 메시지를 주고 받지 않는 조건을 염두에 두고 있는 것 같지는 않습니다.
한편, 최소 지연시간에 해당하는 µm은 일반적으로 거리와 빛의 속도로 계산할 수 있는 수준의 값으로 언급되고 있습니다. 그리고, 주어진 µm에 대해 k와 e가 얼마나 작아야 하는지에 대한 계산 방법도 설명하고 있습니다.
현실적으로는 이 논문에서 정의된 Physical Clock은 항상 가장 빠른 시계에 맞춰지게 되므로 실제 시간 (physical time)보다 더 빨라지게 되겠지만, 분산시스템의 동기화 문제를 해결하는 능력과는 무관하다고 할 수 있을 것 같습니다.
6. Closing
이 논문은 분산시스템의 문제에 대한 이해에 가장 중요한 사고 도구로 사용할 수 있는 중요한 개념들을 정의하고 있는 것 같습니다. 내용 자체를 정확하게 이해했는지의 여부를 차치하더라도, 이러한 개념이 마음에 와닿기 위해서는 몇 번은 더 읽어봐야 할 것 같습니다.
다음에는 Vector Clock에 관한 논문인 C. Fidge의 Timestamps in Message-Passing Systems That Preserve the Partial Ordering을 읽어볼 예정입니다.