Software Development

Paper: Dotted version vectors: Logical clocks for optimistic replication (Part 2)

Paper: Dotted version vectors: Logical clocks for optimistic replication (Part 1)

A Kernel for Eventual Consistency

인과성을 이용하는 분산 스토리지의 동작에 있어서 논리적 시계 집합에 대한 sync 와 update 2개의 오퍼레이션이 핵심을 이루고 있다고 주장하고 있다.

먼저 sync 오퍼레이션의 경우에는 두 개의 시계 집합을 취해서 두 집합의 원소들인 논리적 시계들 사이에 인과성의 관계가 있다면 이전에 해당하는 시계를 모두 버리고, 남아있는 원소들로 구성된 집합을 반환하는 오퍼레이션이다. 결과적으로 반환되는 집합은 동시적 (concurrent)인 관계에 있는 시계들로만 이루어지게 된다.

sync는 클라이언트와 서버 도는 서버의 노드들 사이의 동기화가 필요한 시점에 논리적 시계에 기반해서 과거의 값들을 버리기 위한 오퍼레이션이라고 볼 수 있다. 여기서 재미있는 것은 sync는 논리적 시계가 실제로 어떻게 구현되어있는지에 상관없이 시계들 사이의 부분순서 (partial order)만을 이용해서 일반적으로 정의할 수 있다는 것이다.

sync_defined_by_partial_order

 

update 오퍼레이션은 어떤 시계 집합 (통상적으로 클라이언트)과 서버의 어떤 노드의 시계 집합, 서버의 식별자를 취하고 하나의 시계를 반환하는 오퍼레이션이다. 이 시계는 클라이언트 시계 집합 내의 모든 시계들을 dominate하고, 시스템 내의 시계들의 어떤 join에 의해서도 dominate되지 않아야 한다. (즉, dominate하거나 concurrent 해야한다.)

인과적인 이력 (causal histories)의 경우, update 오퍼레이션은 다음과 같이 정의할 수 있다. 시스템 전체에서 고유한 사건 식별자를 얻어서 클라이언트 시계 집합의 각 시계에 추가하는 방식이다.

update_operation_of_causal_histories

이어서 분산 스토리지의 get/put 오퍼레이션에서 위에서 정의한 sync/update 오퍼레이션을 이용해서 논리적 시계를 어떻게 다루는지에 대해서 설명하고 있으나 여기서는 생략하기로 하자.

Dotted Version Vectors

버전 벡터가 (id, m)의 형태로 표기된다면 dotted version vector는 (id, m, n)과 같이 표기할 수 있다. 버전 벡터가 연속적인 인과적인 이력을 표현하고 있다면, dotted version vector는 그 연속적인 이력에 n에 해당하는 독립적인 사건을 추가한 것을 표현할 수 있다.

dotted_version_vector_definition

 

예를 들어, {(a,2),(b,1),(c,3,7)}이라는 dotted version vector는 {a1, a2, b1, c1, c2, c3, c7}와 같은 인과적인 이력을 표현하고 있다.

dotted version vector를 인과적인 이력으로 정의했다면, 부분순서가 어떻게 정의되는 지를 살펴볼 때다.

dotted_version_vector_partial_order

 

 

dotted version vector를 인과적인 이력으로 변환해서 생각하면 당연한 결과라고 할 수 있다.

부분순서가 정의되어있으므로 sync 오퍼레이션은 추가적으로 정의할 필요가 없으나, update 오퍼레이션은 dotted version vector에 대해 정의할 필요가 있다.

dotted_version_vector_update_function

 

이 간결한 식에 매우 많은 의미를 담고 있는데, 합집합(union)의 좌항의 경우, 클라이언트의 시계집합(S)에 속하는 노드의 식별자들 중 파라미터에서 제공한 노드의 식별자 (r)가 아닌 것들에 대해서,  각각의 식별자와 클라이언트 시계집합에서 해당 식별자에 대한 가장 큰 sequence를 pair로 하는 버전 벡터들을 나타낸다. 우항의 경우에는 파라미터에서 제공한 노드의 식별자(r)에 대한 dotted version vector를 구성하고 있는데, 좌항의 경우와 유사하게 첫번째 정수는 클라이언트의 시계집합에서 해당 식별자에 대한 가장 큰 sequence가 된다. 양쪽 모두 클라이언트의 문맥을 표현하는 것이라고 볼 수 있다. 두번째 정수는 조금 특이한데, 파라미터에서 제공한 노드의 시계 집합에서 역시 해당 노드의 식별자에 대한 가장 큰 sequence를 얻은 후 1만큼 증가시켜준 값으로 설정된다. 이는 노드 상의 문맥에서 업데이트로 인한 새로운 사건을 기록한 것이라고 볼 수 있다.

이러한 update 오퍼레이션을 그대로 적용하면 다음과 같이 아름답게 움직이는 시스템이 된다.

dotted_version_vector_operations

 

논문에서는 dotted version vector에 대한 correctness에 대해서 설명하고 있는 듯 하나 생략하도록 한다. 이 논문을 여러번 다시 읽어보았지만 dotted version vector가 어떻게 문제를 해결하는지에 대한 직관적인 설명을 하는 것이 아직도 어려운 것 같다.

dotted_version_vector_reason_of_correctness

Related Work

  • Lamport clock
    • L. Lamport, “Time, clocks and the ordering of events in a distributed system,” Communications of the ACM, vol. 21, no. 7, pp. 558–565, Jul. 1978.
      • http://blog.lastmind.net/archives/720
  • Version vector
    • D. S. Parker, et al., “Detection of mutual inconsistency in distributed systems,” Transactions on Software Engineering, vol. 9, no. 3, pp. 240–246, 1983.
  • Vector clock
    • C. Fidge, “Timestamps in message-passing systems that preserve the partial ordering,” in 11th Australian Computer Science Conference, 1989, pp. 55–66.
      • http://blog.lastmind.net/archives/736
    • F. Mattern, “Virtual time and global clocks in distributed systems,” in Workshop on Parallel and Distributed Algorithms, 1989, pp. 215–226.
  • Dynamic creation and retirement of vector entries
    • R. A. Golding, “A weak-consistency architecture for distributed information services,” Computing Systems, vol. 5, pp. 5–4, 1992.
    • K. Petersen, et al., “Flexible update propagation for weakly consistent replication,” in Sixteen ACM Symposium on Operating Systems Principles, Saint Malo, France, Oct. 1997.
    • P. S. Almeida, et al., “Interval tree clocks,” in Proceedings of the 12th International
      Conference on Principles of Distributed Systems, ser. OPODIS ’08. Berlin, Heidelberg: Springer-Verlag, 2008, pp. 259–274.
  • Scalability problems
    • B. Charron-Bost, “Concerning the size of logical clocks in distributed systems,” Information Processing Letters, vol. 39, pp. 11–16, 1991.
    • D. H. Ratner, “Roam: A scalable replication system for mobile and distributed computing,” Ph.D. dissertation, 1998, uCLA-CSD-970044.
    • R. Prakash and M. Singhal, “Dependency sequences and hierarchical clocks: Efficient alternatives to vector clocks for mobile computing systems,” Wireless Networks, pp. 349–360, 1997, also presented in Mobicom96.
    • P. Mahajan, S. Setty, S. Lee, A. Clement, L. Alvisi, M. Dahlin, and M. Walfish, “Depot: Cloud storage with minimal trust,” in OSDI 2010, Oct. 2010.
    • D. Malkhi and D. B. Terry, “Concise version vectors in winfs,” in DISC, ser. Lecture Notes in Computer Science, P. Fraigniaud, Ed., vol. 3724. Springer, 2005, pp. 339–353.
    • V. Ramasubramanian, et al., “Cimbiosys: a platform for contentbased partial replication,” in Proceedings of the 6th USENIX symposium on Networked systems design and implementation. Berkeley, CA, USA: USENIX Association, 2009, pp. 261–276.
    • F. J. Torres-Rojas and M. Ahamad, “Plausible clocks: constant size logical clocks for distributed systems,” Distributed Computing, vol. 12, no. 4, pp. 179–196,
      1999.
  • Trade-off
    • B. B. Kang, R. Wilensky, and J. Kubiatowicz, “The hash history approach for reconciling mutual inconsistency,” in Proceedings of the 23nd International Conference
      on Distributed Computing Systems (ICDCS). IEEE Computer Society, 2003, pp. 670–677.

 

Paper: Dotted version vectors: Logical clocks for optimistic replication (Part 2) 더 읽기"

Paper: Challenges to Adopting Stronger Consistency at Scale

AJOUX, Phillipe, et al. Challenges to Adopting Stronger Consistency at Scale. In: 15th Workshop on Hot Topics in Operating Systems (HotOS XV). USENIX Association. (pdf, slides)

최근 geo-replicated data store에서 높은 consistency를 제공하기 위한 연구들이 많이 있다고 한다. – 이 논문에서 나열하고 있는 그런 논문들의 목록도 흥미로운데 이는 마지막에 한번 정리해보자. – Facebook에서도 사용자들에게 – 특히, 읽기에 대한 – end-to-end consistency를 제공하기 위해서 이러한 연구 결과들을 도입하고 싶으나, 여러가지 이유로 이것이 쉽지가 않은데, 그 이유가 무엇인지 일단 알아보자…라는 것이 이 논문의 취지.

There have been many recent advances in distributed systems that provide stronger semantics for geo-replicated data stores like those underlying Facebook. These research systems provide a range of consistency models and transactional abilities while demonstrating good performance and scalability on experimental workloads. At Facebook we are excited by these lines of research, but fundamental and operational challenges currently make it infeasible to incorporate these advances into deployed systems. This paper describes some of these challenges with the hope that future advances will address them.

Facebook은 확장성과 효율을 위해서 샤딩, 복제, 캐싱 등을 아주 많이 활용하고 있는 것으로 보인다. Sodcial graph 시스템을 예로 들고 있는데, 하나의 node나 edge가 MySQL(!)에 쓰여지면, 이것이 비동기적으로 다른 데이터 센터로 복제되고, 여러 계층의 캐시들을 업데이트하고, pub-sub 시스템에 의해 검색이나 뉴스피드를 위한 다른 시스템으로 전달된다. 당연한 이야기지만, Facebook 내의 여러 기능에 해당하는 시스템들은 효율을 위해 각자 특별한 샤딩 방식이나 캐시, 인덱스들을 유지해야한다. Twitter (http://blog.lastmind.net/archives/664)나 LinkedIn (http://blog.lastmind.net/archives/657)의 시스템과 같이 Primary Data Store (DB)로부터 비동기적인 복제를 통해 Secondary Index를 생성하거나 다른 서비스로 이벤트를 전달하는 방식은 현재의 대규모 인터넷 서비스에서는 매우 공통적인 부분으로 보인다.

Facebook relies on sharding, data replication, and caching to efficiently operate at scale. When a node or edge is added to the social graph, it is first written to a single MySQL instance. From there it is asynchronously replicated to all of our data centers, updated or invalidated in multiple layers of cache, and delivered through a scalable publisher-subscriber system to other services such as Search and News Feed. No single data placement strategy can efficiently serve all workloads in a heavily sharded system, so many of these services choose a specialized sharding function and maintain their own data store, caches, and indexes.

facebook_data_layout

Fundamental Challenges

Integrating Across Stateful Services

Facebook의 Social graph는 여러 서비스들을 위한 시스템에 각 서비스를 위한 형태로 복제되고 있다고 한다. 문제는, 대부분의 연구 결과들에서 나타나는 설계들은 단일한 서비스를 상정하고 있다는 것이다. Facebook는 그러한 여러한 서비스가 따로 사용자에게 제공되기 보다는 하나의 서비스처럼 제공되므로 여러가지 문제들이 발생하게 된다.

Facebook’s architecture of cooperating heterogeneous services is different from the monolithic service that
most research designs assume. This difference alone is not fundamental—Facebook can be externally viewed as a single service even though internally it is comprised of many services.

Storing Data Consistently Across Services

Facebook의 각 서비스들은 Social graph의 일부에 대한 캐시, 인덱스, 복제본 등을 가지고 있다. Wormhole이라는 Facebook의 pub-sub 시스템은 업데이트를 비동기적으로 여러서비스들에 전달해주고 있다.

In a sharded and scaled environment like Facebook’s, services may maintain their own optimized cache, index, or copy of a portion of the social graph. Wormhole [40] is our scalable pub-sub system that asynchronously delivers update notifications to all of the services that manage state.

이 때, 만약 높은 consistency를 제공하기 위해서는 동기적으로 업데이트 통지를 전달할텐데, availability와 latency를 떨어뜨리는 요인이 된다. 인과성을 위한 시스템을 사용하더라도 여러 서비스들 사이의 의존관계를 표현하는 것은 굉장한 복잡도를 가져오게 된다.

To provide strong consistency across our services, we would need to propagate update notifications synchronously impacting availability and latency. A causal system could avoid atomic updates, but would still need to be integrated into every service and threaded through every communication channel greatly increasing the complexity of the dependency graph.

Decentralized Access to Services

클라이언트는 한 화면에서 표시되는 여러 컨텐츠를 별도의 리퀘스트를 통해 가져와서 클라이언트에서 통합한다. 이 때문에 클라이언트는 일관되지 않은 결과들을 가지고 session-based guarantee를 보장함으로써 사용자에게 일관된 상태를 보여주어야 한다.

The Facebook web site and its mobile applications use parallelism and pipelining to reduce latency. The notification bar and the central content are fetched by separate requests, for example, and then merged at the client. Mobile applications also aggressively cache results using local storage, and then merge query results with previously seen values. […] but it leaves the user’s device as the only safe location to define session-based guarantees or demarcate isolation boundaries.

단일한 사용자는 보통 동일한 지역의 동일한 클러스터로 라우팅 되므로 이러한 성질은 사용자가 inconsistency를 경험할 확률을 낮추어준다. 하지만, 그러한 성질이 보장되는 것은 아니므로 역시 문제는 있다.

Requests for a single user are usually routed to the same cluster in the same region. This routing stability assists in reducing the number of user-visible inconsistencies, because most of the variance in asynchronous update propagation is inter-region.

Composing Results

조금 설명이 애매하기는 한데, 여러 서비스로부터의 결과를 조합해야할 경우, 그 사이의 의존관계를 다루어야 하는데, 각 서비스는 각자의 서비스를 위한 일부의 결과를 가지고 있기 때문에, 이를 일반적으로 처리하기가 쉽지 않다는 의미인 듯 하다.

Each service can be thought of as implementing its own limited query language, so it is not clear that the dependencies can be captured in a way that is both sufficiently generic and sufficiently precise.

Query Amplification

사용자 입장에서는 하나의 쿼리지만, 내부적으로는 여러 서비스들에 대한 수천개의 쿼리로 실행될 수 있다는 이야기.

For example, a user’s home page queries News Feed to get a list of stories, which depends on a separate service to ensure story order of previously seen content. These stories are then fetched from TAO to get the content, list of comments, likes, etc. In practice, this behavior produces fork/join query patterns that have fan-out degrees in the hundreds and critical paths dozens deep.

Slowdown Cascades

동일한 데이터가 각각의 서비스에서는 서로 다른 샤딩 방식과 인덱싱을 통해 저장되므로, 서로 다른 서비스들의 샤드 사이에 all-to-all ordering 의존 관계가 발생하고, 높은 consistency를 요구하는 시스템에서는 하나의 서비스에서 하나의 샤드만 실패 또는 지연되더라도 이에 의존하는 모든 다른 서비스의 모든 샤드에 영향을 줄 수 있다는 이야기.

Different services use different sharding and indexing schemes for the same data. For instance, TAO shards data based on a 64-bit integer id, News Feed shards based on a user id, and the secondary index system shards based on values. The use of different sharding schemes creates all-to-all ordering dependencies between shards in different services. This connectedness accelerates slowdown propagation in strongly consistent systems. A failure in one shard of one service would propagate to all shards of a downstream service.

Latency Outliers

여러 쿼리들을 조합해야하는 경우, 하나의 쿼리만 느리더라도 최종적인 응답에 영향을 미칠 수 있음. 이것이 strong consistency와 직접적인 연관성이 있는지는 명확하게 이해하기 어려우나, strongly consistent system의 응답속도나 outlier들이 그렇지 않은 시스템들에 비해 더 높기 때문에 언급되는 것 같음.

Parallel subqueries amplify the impact of latency outliers. A user request does not return until it has joined the results of all of its queries, and thus must wait for the slowest of its internal requests.

Linchpin Objects

데이터의 개수도 많고 매우 자주 읽히는데다 쓰기도 많은 무언가를 Linchpin Object라고 부르고 있다. 흔히 Facebook의 연예인 페이지나 유명 대기업의 페이지를 상상하면 될 것 같다. 역시 좋은 성능을 제공하는 것이 strongly consistent system에서는 중요하다고 얘기하고 있다.

The most frequently read objects are often also frequently written to and highly connected. Examples of these linchpin objects include popular users like celebrities, popular pages like major brands, and popular locations like tourist attractions. […] A system that provides stronger consistency must provide good performance for these linchpin objects.

Net Benefit to Users

stronger consistency는 프로그래머가 프로그램의 정확성에 대해 생각하기 편리하고, 사용자에게 일관성을 제공한다는 측면에서 이점을 가지고 있지만, 이를 정량적으로 측정하기는 어려울 뿐만 아니라, 그러한 이점들이 단점들 – 커뮤니케이션 오버헤드, 무거운 상태관리, 응답속도의 증가 – 을 넘어서지 않을지도 모름.

Systems with strong consistency are easier for programmers to reason about and build on top of [6, 12] and they provide a better experience for users because they avoid some anomalous application behavior. However, these benefits are hard to quantify precisely, and they do not appear in isolation. When all aspects of user experience are considered it might even be the case that the benefit does not outweigh the detriment. […] Stronger properties are provided through added communication and heavier-weight state management mechanisms, increasing latency. Although optimizations may minimize these overheads during failure-free execution, failures are frequent at scale. Higher latency may lead to a worse user experience, potentially resulting in a net detriment.

Operational Challenges

Fully Characterizing Worst-Case Throughput

여러 서비스가 얽혀있는 복잡한 시스템에서 실패나 복구 방법에 대해 미리 예측하거나 관찰하기 쉽지 않을 뿐더러 이를 정형화하기도 쉽지 않다는 얘기.

While there is no theoretical impediment to preserving worst-case throughput despite strengthening Facebook’s consistency guarantees, our experience is that failure and recovery behavior of a complex system is very difficult to characterize fully. Emergent behavior in cross-service interactions is difficult to find ahead of time, and may even be difficult to identify when it is occurring

Polyglot Environment

(아마도 백엔드) 서비스 내에서 C++이 가장 보편적으로 사용되는 언어인 것은 매우 신기함.

While C++ is the predominant language for standalone services at Facebook, Python, Java, Haskell, D, Ruby and Go are also supported by Thrift, our inter-service serialization format. The application code coordinating service invocations is generally written in Hack and PHP, and final composition of query
results may be performed by JavaScript in the browser, Objective C/C++ on IOS, or Java on Android.

여러 언어를 사용하고 있기 때문에 인터페이스나 쓰레딩 모델 등에 대해 가정하는 것이 어렵다는 이야기.

Ports or foreign function interfaces must be provided to give all of these codebases access to an end-to-end
consistency platform. Perhaps the trickiest part of this multi-language support is the wide variety of threading models that are idiomatic for different languages and runtimes, because it is likely the underlying consistency system will need to initiate or delay communication.

Varying Deployment Schedules

낮은 레벨의 서비스들은 매우 보수적인 deployment 프로세스를 가지고 있으므로, 이에 관련한 시스템을 빠르게 개발하는 것은 쉽지 않을거라는 이야기.

Facebook has an extremely aggressive deployment process for the stateless application logic, but we are necessarily more conservative with stateful and mature low level services. An inter-service coordination mechanism that is used by or interposes on these low-level services will have a deployment velocity that is constrained by the release engineering requirements of the most conservative component.

Reduced Incremental Benefit in a Mature System

consistency에 관해 이미 적용된 workaround들이 있으므로, 이를 일반적으로 개선하는 시스템의 이득이 떨어진다는 이야기.

While these workarounds would not exist in a newly built system, their presence in Facebook reduces the incremental benefits of deploying a generic system for stronger consistency guarantees.

Related Work

Prior Facebook Publications

  • memcache cache
    • B. Atikoglu, et al. Workload analysis of a large-scale keyvalue store. In SIGMETRICS, 2012.
    • R. Nishtala, et al. Scaling memcache at facebook. In NSDI, 2013.
  • TAO graph store
    • N. Bronson, et al. Tao: Facebook’s distributed data store for the social graph. In USENIX ATC, 2013.
  • Wormhole pub-sub system
    • Y. Sharma, et al. Wormhole: Reliable pub-sub to support geo-replicated internet services. In NSDI, May 2015
  • a characterization of load imbalance in the caches
    • Q. Huang, et al. Characterizing load imbalance in real-world networked caches. In HotNets, 2014.
  • an analysis of the messages use case
    • T. Harter, et al. Analysis of hdfs under hbase: A facebook messages case study. In FAST, 2014.
  • understanding and improving photo/BLOB storage and delivery
    • D. Beaver, et al. Finding a needle in haystack: Facebook’s photo storage. In OSDI, 2010.
      Q. Huang, et al. An Analysis of Facebook Photo Caching. In SOSP. ACM, 2013.
    • S. Muralidhar, et al. f4: Facebook’s warm blob storage system. In OSDI, 2014.
    • L. Tang, et al. RIPQ: Advanced Photo Caching on Flash For Facebook. In FAST, Feb. 2015.

Systems with Stronger Semantics

  • replicated state machines
    • L. Lamport. Time, clocks, and the ordering of events in a distributed system. Comm. ACM, 21(7), 1978.
    • F. B. Schneider. Implementing fault-tolerant services using the state machine approach: a tutorial. ACM Computer Surveys, 22(4), Dec. 1990.
  • causal and atomic broadcasts
    • K. P. Birman and R. V. Renesse. Reliable Distributed Computing with the ISIS Toolkit. IEEE Comp. Soc. Press, 1994.
  • systems designed for high availability and stronger consistency
    • K. Petersen, et al. Flexible update propagation for weakly consistent replication. In SOSP, Oct. 1997.
  • scalable causal consistency for datacenter-scale and/or geo-replicated services
    • S. Almeida, et al. Chainreaction: a causal+ consistent datastore based on chain replication. In EuroSys, 2013.
    • P. Bailis, et al. Bolton causal consistency. In SIGMOD, 2013.
    • J. Du, et al. Orbe: Scalable causal consistency using dependency matrices and physical clocks. In SOCC, 2013.
    • J. Du, et al. Gentlerain: Cheap and scalable causal consistency with physical clocks. In SOCC, 2014.
    • W. Lloyd, et al. Don’t settle for eventual: Scalable causal consistency for wide-area storage with COPS. In SOSP, Oct. 2011.
    • W. Lloyd,  et al. Stronger semantics for low-latency geo-replicated storage. In NSDI, Apr. 2013.
  • strong consistency for datacenter-scale and/or geo-replicated services
    • J. C. Corbett, et al. Spanner: Google’s globally distributed database. In OSDI, Oct. 2012.
    • R. Escriva, et al. HyperKV: A distributed, searchable key-value store for cloud computing. In SIGCOMM, 2012.
    • L. Glendenning, et al. Scalable consistency in Scatter. In SOSP, Oct. 2011.
    • C. Li, D. Porto, A. Clement, J. Gehrke, N. Preguic¸a, and R. Rodrigues. Making geo-replicated systems fast as possible, consistent when necessary. In OSDI, Oct. 2012.
    • J. Ousterhout, P. Agrawal, D. Erickson, C. Kozyrakis, J. Leverich, D. Mazi`eres, S. Mitra, A. Narayanan,
      M. Rosenblum, S. M. Rumble, E. Stratmann, and R. Stutsman. The case for ramcloud. ACM SIGOPS Operating Systems Review, 43(4), 2010.
    • D. B. Terry, V. Prabhakaran, R. Kotla, M. Balakrishnan, M. K. Aguilera, and H. Abu-Libdeh. Consistency-based service level agreements for cloud storage. In SOSP, 2013.
  • transactions for datacenter-scale and/or geo-replicated services
    • P. Bailis, et al. Scalable atomic visibility with ramp transactions. In SIGMOD, 2014.
    • J. Baker, et al. Megastore: Providing scalable, highly available storage for interactive services. In CIDR, Jan. 2011.
    • J. C. Corbett, et al. Spanner: Google’s globally distributed database. In OSDI, Oct. 2012.
    • T. Kraska, et al. Mdcc: Multi-data center consistency. In EuroSys, 2013.
    • W. Lloyd, et al. Stronger semantics for low-latency geo-replicated storage. In NSDI, Apr. 2013.
    • S. Mu, et al. Extracting more concurrency from distributed transactions. In OSDI, 2014.
    • Y. Sovran, et al. Transactional storage for geo-replicated systems. In SOSP, Oct. 2011.
    • A. Thomson, et al. Calvin: fast distributed transactions for partitioned database systems. In SIGMOD, May 2012.
    • C. Xie, et al. Salt: combining acid and base in a distributed database. In OSDI, 2014.
    • Y. Zhang, et al. Transaction chains: achieving serializability with low latency in geo-distributed storage systems. In SOSP, pages 276–291. ACM, 2013.

Discussions of Challenges to Stronger Consistency

  • criticism of enforcing an order in a communication substrate instead of end to end
    • D. R. Cheriton, et al. Understanding the limitations of causally and totally ordered communication. In
      SOSP, Dec. 1993.
  • slowdown cascades to motivate enforcing only an explicit subset of causal consistency
    • P. Bailis, et al. The potential dangers of causal consistency and an explicit solution. In SOCC, 2012.
  • detail the importance and challenges for tail latency in high-scale services at Google
    • J. Dean, et al. The tail at scale. Comm. ACM, 56(2), 2013.

My Opinion

Strongly consistent system의 연구자들에게 방향을 제시하기 위해 Facebook의 시스템이라는 도메인 내에서 발생하는 숙제들을 정의하고 있는 논문이라고 볼 수 있는데, 마치 Facebook에서 strongly consistent system을 일반적으로 도입해서는 안되는 이유들을 정리해놓은 듯한 느낌이 들었다. 오히려 나의 관심은 Facebook에서 데이터의 복제에 관한 시스템과 복제본 사이의 consistency 문제를 어떻게 다루고 있는가 였는데, 우리가 고심하고 있는 문제들이나 해결 방향이 상당히 닮아 있다는 인상을 받았다. 여러 서비스에 걸친 업데이트 통지를 위해 인프라로서 pub-sub을 적극적으로 활용하고 있는 점이나, 서비스별로 소셜 그래프의 별도 인덱스나 캐시 등을 잘 구축해서 활용하고 있는 점은 잘하고 있다고 생각했다. Facebook의 graph storage인 TAO, pub-sub 시스템인 Wormhole 등에 대해 관심이 생겼다.

Paper: Challenges to Adopting Stronger Consistency at Scale 더 읽기"

Article: An Inside Look at Facebook’s Approach to Automation and Human Work

An Inside Look at Facebook’s Approach to Automation and Human Work

Facebook의 VP of Engineering인 Jay Parikh와의 자동화에 관한 인터뷰.

하드웨어의 실패에 따른 자동화된 진단과 복구를 위한 FBAR라는 자동화 시스템에 대해 이야기하고 있다.
FBAR에 대해서는 조금 오래된 글 (2011년)이지만, 다음 글에서 조금 더 자세한 내용을 볼 수 있다.
https://www.facebook.com/notes/facebook-engineering/making-facebook-self-healing/10150275248698920

We built a system we call FBAR, for Facebook Auto Remediation, to do a very basic set of hardware remediation tasks. Before, if a server had a hard drive failure or some hardware error, an alarm would go off and some human would have to log in, or walk to the computer, and try to debug or fix it. You’d do some things to try to fix it in software, you’d reboot the machine, you might try to reimage it. A lot of that software remediation and debugging is all automated now. No person has to be involved with that. The system will detect the error– it could be a disk drive, it could be a CPU, it could be a networking card or power failure—and can go ahead and do a bunch of things that it knows how to do.

당연한 이야기이지만, 자동화의 이점 중 하나는 휴먼 에러를 방지하는 것이다.

You don’t have to worry about that with automation, because work consistently gets done in the same way every time. And cluster turn-ups which used to take three or four months now can be done in the course of a week – sometimes in just a couple of days.

자동화를 통한 효율에 관한 이야기인데, 일반적인 IT 회사에서 200~500대의 서버당 1명이 일하고 있다면, Facebook에서는 25,000대당 1명의 효율을 가지고 있다고… 예전에도 들은 바가 있는 것 같지만, 2000대를 1명이 담당하는 정도가 되니 불안함을 느꼈던 것을 생각하면 대단한 것 같기는 하다.

So we have done a lot of work on the software automation side of this, to the point that we only need one technician in the data center for every 25,000 servers. That is a ratio that is basically unheard of. Most IT shops have ratios of one to 200, or one to 500.

이 인터뷰에서 자동화의 이점으로 가장 강조하고 있다고 느끼는 점인데, 상당히 단순한 부분들을 자동화하고 나면, 똑독한 사람들이 더욱 높은 수준의 일을 할 수 있고 미래에 대해서 생각할 수 있도록 한다는 것이다.  그리고, 그런 똑똑한 사람들이 오랜 시간에 걸쳐 평범한 일을 하게 된다면 결국 번아웃과 불행함으로 이어지고, 결국 이직을 하게 마련인데, 업무가 지루하고 반복적인 단순 업무가 되지 않도록 지속적으로 자동화를 하고 아키텍쳐를 바꿔나가야한다고 얘기하고 있다.

I think one of the major ways you do this is by continuing to keep them out of their comfort zone. If they end up doing “humdrum” work for a long time, they’re not learning anything, yet they’re spending a lot of time doing it. That leads to burnout and unhappiness, and then they’re going to go somewhere else. So, I think, if growing your company depends on keeping these brilliant technologists engaged, the necessity is that you have to keep automating and rearchitecting your systems so that things don’t become boring, monotonous, and repetitive. Automation serves your talent objectives.

변화를 위한 조직 구조에 대해 – 변화를 위한 팀과, 안정적이고 효율적인 운영을 위한 팀을 분리하는 것이 전통적인 접근.

Otherwise, you put them in a position where one team, in order to hit its goals, always wants to make changes, and another team only wants to make everything stable and cost-effective. Those two are completely opposed to each other. […] When I showed up at Facebook in 2009, this is what I saw and I thought is was perfectly reasonable. It was this way when I was at Akamai, it was this way when I got to Ning.

문제는 단기적인 비용에 기반해 의사결정을 하기 때문에, 미래를 위한 투자를 하지 않게 되고, 자동화는 작은 팀의 임무로 맡겨지기 때문에, 수많은 엔지니어들의 요구에 따라가지 못하게 되는 것.

Sometimes we were making decisions that were short-term cost based, when we should have been basing them on what we would need to be ready for a year from now. In other places, the focus on automation wasn’t there, because it wasn’t a team’s own responsibility. Questions about automation were being tossed to a small team of people who just weren’t going to keep up with this swarm of engineers that we were hiring.

Facebook에서는 이를 같은 팀에 묶고 변화와 운영을 동시에 하도록 하고 있다. 이 또한 쉬운 일은 아닌데, 인터럽트들이 장기적인 목표에서 벗어나도록 하기 때문. 데드라인에 대해서 조금 유연하게 대처할 필요가 있다고 얘기하고 있다.

No, we come up with what we call big bets as a team, planning the investments in technology that will take one or two or three years to build. For example, we built a new compiler that runs the front end of our site. It took us a couple of years to build and that R&D effort was done in parallel, in the same team that was maintaining the existing run time that was running the live Facebook.

이러한 방식의 장점에 대해서, 혁신을 위한 팀은 변화에 대한 성과에 대해서 걱정하고, 전선에서 뛰는 비즈니스 팀은 언제쯤 재미있는 일을 할 수 있는지에 대한 불만을 표하는 상황 등을 방지할 수 있다고 얘기하고 있다.

But meanwhile there are so many benefits — starting with the fact that it’s done in a much more open way. No one likes it when there’s a team working in some secure undisclosed location “doing something really cool that is going to replace what you’ve got.” It’s a tough thing to manage on both ends. The innovation team worries: “We’re not making any impact now – they’ve just stuffed us in a corner and told us go deliver something great.” And the core business team you’re depending on to deal with problems and customer support issues and all the urgent things that come up, is saying, “When do we get to work on something cool? Are we second-class citizens or something?”

의견

자동화의 이점에 대해서는 100% 동감하고 적어도 IT 업계의 일정 규모 이상의 회사라면 어떤 조직이라도 항상 중요한 목표가 되어야 한다고 생각한다. 변화팀과 운영팀을 통합하는 조직 구조에 대해서는 전반적으로는 동의하나, 아마도  VP 레벨의 입장에서는 통합된 조직일지는 몰라도 실무 수준에서는 이 글에서도 제시하듯이 인터럽트와 단기적인 비용에 기반한 요구들은 장기적인 관점에서의 업무에 매우 해롭기 때문에, 일정 규모 이상의 조직이라면, 정도의 차이는 있더라도 팀 내에서라도 어느 정도의 분리는 필요하지 않는가라고 생각한다. 다만, 변화팀과 운영팀이 서로 얼굴을 맞대고 이야기할 수 없을 정도로 서로 다른 조직의 바운더리로 분리되어 있다면, 이 글에서 지적된 것과 같은 부정적인 효과는 항상 발생할 것이다.

Article: An Inside Look at Facebook’s Approach to Automation and Human Work 더 읽기"

Paper: Dotted version vectors: Logical clocks for optimistic replication (Part 1)

PREGUIÇA, Nuno, et al. Dotted version vectors: Logical clocks for optimistic replication. arXiv preprint arXiv:1011.5808, 2010.

Dynamo, Cassandra, Riak, Voldemort와 같은 시스템들은 쓰기 가용성 (write availability)을 보장하기 위해, 어떤 하나의 데이터 항목의 여러 복제본이 동일한 데이터 항목의 복제본이 서로 다른 값으로 갈라질 (diverge) 수 있고, 이를 나중에 수리(repair)하는 방법을 고안하고 있다. 이 때, 복제본 버전들을 비교해서 어느 한쪽이 새로운 업데이트라서 다른 복제본들을 교체할 수 있는지, 아니면 동시적 (concurrent)이라서 한지를 semantic reconciliation을 필요로하는 지를 결정할 수단을 필요로 한다. 이러한 비교는 업데이트 사이의 인과적인 의존성 (causal dependency)에 따라 판단하는데, 인과적인 이력 (Causal History)을 모두 기록하는 것은 비효율적이므로, 이 정보를 요약하는 동시에 인과적인 의존성을 따질 수 있는 수단으로 람포트 시계(Lamport Clock)나 버전 벡터(Version Vector) 등의 개념들이 제안되고 실제로 위와 같은 시스템들에서 활용되고 있다.

이 논문에서 도입하고 있는 Dotted Version Vector는 인과적인 이력을 요약하기 위한 이러한 수단들이 가진 정확성(Correctness)의 한계를 개선하고, 동시에 확장성(Scalability)을 가지는 해결 방법으로서 제시되고 있다.

우선 이번 글에서는 논문의 전반부에 소개된, 인과성의 추적을 위해 사용되는 여러 방법들과 한계들에 대해서 알아보도록 하자.

1. 인과적인 이력 (Causal Histories)

causal_histories

우리가 다루고자 하는 클라이언트-서버로 구성된 스토리지 시스템에서, 클라이언트가 서버로 어떤 작업을 요청할 때마다  인과적인 관계가 발생한다. 이러한 관계를 formal하게 표현하기 위한 방법 중 하나가 인과적인 이력 (Mattern, 1994)이다. 인과적인 이력은 업데이트라는 사건들에 대한 유일한 식별자(identifier)의 집합으로 표현할 수 있다. 이 때 업데이트 사건의 식별자는 노드(리플리카 또는 클라이언트)의 식별자와 단조증가하는 카운터를 합쳐서 만들 수 있다. 업데이트가 발생할 때마다 새로운 식별자가 식별자의 집합에 추가하게 되고, 이 집합의 포함관계를 비교해서 인과성의 부분순서(partial order)를 추적할 수 있다. 두 집합이 서로를 포함하고 있지 않다면 동시적이라고 할 수 있다.

인과적인 이력은 개념적으로는 단순하지만, 업데이트에 따라서 식별자의 집합이 선형적으로 커지게 되므로, 실용적인 시스템에 사용하는 것은 적절하지 않다.

2. 인과적으로 호환되는 완전 순서 (Causally compliant total order)

real_time_clock

업데이트들 사이의 인과적인 의존성과 호환되는 완전 순서를 정할 수 있다면, 이를 이용해 last writer wins 정책을 적용할 수 있다. 다른 방식들과 달리, 복제본 노드 (Replica Node)는 하나의 값만 유지하면 되고, 쓰기를 위해 읽기 문맥 (get context)에 해당하는 정보를 제공할 필요가 없기 때문에 매우 단순한 시스템을 얻을 수 있다는 장점이 있다. 문제는 이러한 완전 순서는 인과적인 의존성과 호환되지만, 동시적인 (concurrent) 업데이트들도 정렬해버리기 때문에, 어떤 동시적인 업데이트는 last writer wins 정책에 의해 잃어버리게 된다는 것이다.

2.1. 물리적인 시계

클라이언트들의 시계들이 잘 동기화된다면, 업데이트들을 물리적인 시계의 시간 순서에 따라 정렬함 (동시에 일어난 사건은 프로세스 ID를 이용해 정렬)으로써 완전 순서를 얻을 수 있다. 물리적인 시계에 기반한 완전 순서를 사용하는 방식은 Cassandra 0.6.x나 Dynamo에서 일부 애플리케이션에 대해 버전 벡터의 대안으로 사용되었다고 한다.

물리적인 시계에 기반할 경우의 문제점은 역시 클라이언트들의 시계들이 동기화에서 벗어날 때 발생한다. 시계가 느린 복제본 노드나 클라이언트는 항상 동시적인 업데이트 사이의 경쟁에서 지기 때문에, 항상 동시적인 업데이트를 잃어버리는 문제가 발생한다.

2.2. 램포트 시계 (Lamport Clock)

예전의 글에서 소개했듯이 램포트 시계는 역시 완전 순서를 위해 사용될 수 있는 논리적인 시계의 작동 방식을 제공하고 있다.

3. 서버별 항목을 가진 버전 벡터 (Version vectors with per-server entry)

version_vectors_with_per_server_entries

서버별 항목을 가진 버전 벡터를 이용해 인과성을 추적하는 방법은 다음과 같이 작동한다.

  1. 클라이언트가 GET을 실행할 때 값에 반영된 사건들의 인과적인 이력를 나타내는 버전 벡터를 받음.
  2. 그 클라이언트가 PUT을 실행할 때, 이전의 GET에서 받았던 버전 벡터를 함께 보냄.
  3. PUT을 실행하는 서버는 새로운 업데이트를 반영하기 위해 로컬 카운터를 증가시키고, 서버의 식별자에 해당하는 버전 벡터의 항목에 저장.
  4. 새로운 버전 벡터를 서버에 저장되어 있는 다른 버전 벡터와 비교하고, 낡은 버전들을 모두 버린다.

서로 다른 서버들 사이의 업데이트들 사이의 인과성을 추적하는 것이 가능하지만, 동일한 서버에서 발생한 업데이트들 사이의 인과성을 추적할 수 없다. 즉, 동일한 서버에서 발생한 동시적인 업데이트는 또 다시 last writer wins 정책이 적용되므로, 적어도 하나의 동시적인 업데이트는 잃어버릴 수 밖에 없다. Plausible Clocks에서 설명하듯이, 이러한 문제의 본질적인 원인은, 동시적인 업데이트를 발생시키는 근원에 해당하는 클라이언트의 수에 비해서 적은 수의 버전 벡터 항목을 사용하기 때문이다.

이러한 방식은 Dynamo가 사용하고 있다.

4. 클라이언트별 항목을 가진 버전 벡터 (Version vectors with per-client entry)

version_vectors_with_per_client_entries

서버별 항목을 가진 버전 벡터에서 살펴본 문제를 해결하기 위한 가장 자연스러운 접근 중 하나는 클라이언트별 항목을 가진 버전 벡터를 사용하는 것이다. 동작 방식은 다음과 같다.

  1. 클라이언트가 GET을 실행할 때 값에 반영된 사건들의 인과적인 이력을 나타내는 버전 벡터를 받음.
  2. 그 클라이언트가 PUT을 실행할 때, 이전의 GET에서 받았던 버전 벡터, 그리고 클라이언트의 식별자와 클라이언트별로 단조증가하는 카운터를 함께 보냄.

이 방식은 서로 다른 클라이언트들에 의해 발생한 동시적인 업데이트들 사이의 인과성을 완전히 추적할 수 있지만, 클라이언트 당 하나의 항목을 필요로 하기 때문에, 버전 벡터들의 크기가 클라이언트들의 수에 비례하게 되고, 이 방식을 실용적으로 사용할 수 없게 만드는 원인이 된다.

 

Paper: Dotted version vectors: Logical clocks for optimistic replication (Part 1) 더 읽기"

Kafka: a Distributed Messaging System for Log Processing

Kafka: A distributed messaging system for log processing In Proceedings of 6th International Workshop on Networking Meets Databases (NetDB), Athens, Greece (2011) by J. Kreps, N. Narkhede, J. Rao

Kafka는 기존의 메시징 시스템에서 당연하다고 가정하고 있었지만, 로그 처리 시스템에서는 필요없는 보장들을 과감하게 버리고, 성능 위주의 설계를 함으로써, 실제로 링크 속도에 육박하는 성능을 보여주고 있고, 상당히 단순한 아키텍쳐를 유지하고 있는 점이 흥미로운 점입니다. 그리고, 현실의 데이터 처리에 있어서는 버그나 장애로 인해 흔히 발생하는 재처리가 pull 기반의 소비 모델을 도입함으로써 매우 쉽게 가능해졌다는 점도 눈여겨 볼 부분입니다.

주의: 이 글은 2011에 출판된 Kafka의 페이퍼 내용을 다루고 있으며, 현재의 Kafka 버전의 내용을 다루고 있지 않습니다.

Problem

기존의 엔터프라이즈 메시징 시스템들의 한계를 다음과 같은 이유들로 설명하고 있습니다.

  • 배달 보장 (delivery guarantee)을 위한 기능들은 로그 처리를 위한 시스템 입장에서는 불필요..
  • 처리속도 (throughput)를 디자인 제약으로 고려하지 않음.
  • 분산에 대한 고려가 부족.
  • 메시지가 즉시 소비되는 것을 전제로 하기 때문에 메시지가 쌓일 경우 성능이 하락.

마찬가지로 최근에 만들어진 로그 처리 시스템들 – Scribe, Yahoo’s data highway project, Flume의 한계는 다음과 같은 이유들로 설명하고 있습니다.

  • 로그 데이터를 오프라인으로 처리하는 것을 위해 만들어진 시스템.
  • 대부분은 push 모델.

Kafka Architecture and Design Principles

Kafka의 주요 개념들과 개략적인 디자인은 다음과 같습니다.

kafka_architecture

  • 토픽 (topic): 특정한 타입의 메시지의 스트림
  • 프로듀서 (producer): 어떤 토픽에 대해 메시지를 발행 (publish)할 수 있음.
  • 프로듀서에 의해 발행된 메시지들은 브로커 (broker)라는 서버들에 저장됨. Kafka 클러스터는 일반적으로 여러 브로커들로 이루어짐.
  • 컨수머 (consumer)는 브로커로부터 1개 이상의 토픽을 구독할 수 있고, 브로커들로부터 데이터를 당김(pull)으로써 구독한 메시지들을 소비 (consume)할 수 있음.
    • point-to-point 배달 모델: 여러 컨수머가 하나의 토픽 내의 메시지를 하나씩 소비.
    • 발행/구독 (publish/subscribe) 모델: 여러 컨수머가 하나의 토픽의 각자의 복제본 메시지들을 소비.
  • 토픽은 여러 파티션들로 이루어져 있고, 각각의 브로커는 하나 이상의 파티션을 저장.

Efficiency on a Single Partition

Simple storage

  • 어떤 토픽의 각 파티션은 논리적인 로그에 해당.
  • 물리적으로 로그는 거의 동일한 크기의 세그먼트 파일들의 집합으로 구현됨.
  • 프로듀서가 어떤 파티션에 메시지를 발행할 때마다 브로커는 마지막 세그먼트 파일에 메시지를 append 한다.
  • 성능을 위해서 특정 개수의 메시지가 발행되거나 특정 시간이 흐른 후에 세그먼트 파일을 디스크로 flush한다.
    • 메시지는 flush가 된 후에 컨수머에게 노출됨.
  • 전형적인 메시징 시스템과 달리, 카프카의 메시지에는 메시지 식별자가 없고 로그 상의 논리적인 오프셋 (offset)을 주소로 사용.
    • 메시지 식별자로부터 메시지를 찾기 위한 인덱스 구조를 유지할 필요가 없어짐.
    • Kafka의 메시지 식별자 – 로그 상의 논리적인 오프셋은 자연히 증가하지만, 연속적이지는 않음.
  • 컨수머는 항상 특정한 파티션으로부터 연속적으로 메시지를 소비함.
  • 컨수머가 특정한 메시지 오프셋에 대해 ack을 한다면, 이는 그 파티션의 해당 오프셋 이전의 모든 메시지를 받았음을 의미.
  • 컨수머는 브로커에게 비동기적인 당김 (pull) 요청을 보낸다. 각 요청은 소비할 메시지의 오프셋과 최대로 가져올 메시지의 크기를 포함.
    • 브로커는 모든 세그먼트 파일의 첫번째 메시지의 오프셋들의 목록을 메모리 상에 유지.
    • 브로커는 그 목록을 세그먼트 파일을 찾고, 컨수머에게 데이터를 보낸다.
    • 컨수머가 메시지를 받은 후에는 다음 메시지의 오프셋을 계산해서 다음 요청에서 사용.

kafka_log

Efficient transfer

  • 프로듀서는 한번의 send 요청에 여러 메시지를 보낼 수 있음.
  • 컨수머 API는 한번에 하나의 메시지를 소비하는 것처럼 보이지만, 내부적으로는 각 당김 요청은 특정 사이즈 (보통 수백 KB) 내에 해당하는 여러 메시지를 가지고 옵니다.
  • 메시지들을 직접 메모리 상에 캐싱을 하지 않고 파일 시스템 페이지 캐시에 의존함.
    • 중복된 버퍼링을 피함.
    • 프로세스 내에서 메시지를 캐시하지 않기 때문에 가비지 컬렉션의 오버헤드가 매우 적어짐.
    • 프로듀서와 컨수머는 세그먼트 파일을 순차적으로 액세스하고, 보통 컨수머는 프로듀서보다 살짝 뒤쳐지기 때문에, 보통의 OS 캐싱 휴리스틱 (write through, read-ahead)이 매우 효과적으로 동작함.
  • sendfile API를 이용해서 복사와 시스템 콜 회수를 줄임.

Stateless broker

  • 대부분의 다른 메시징 시스템과 다르게, 각각의 컨수머가 얼마나 소비했는지는 브로커가 유지하지 않음.
    • 이러한 디자인은 브로커의 복잡도와 오버헤드를 줄임.
  • 문제는 이미 소비한 메시지를 언제 지울지 결정하는 것이 어려워짐.
    • Kafka는 시간 기준의 SLA를 사용해서 해결하는데, 특정 기간 (보통 1주일) 이상이 지나면 메시지는 자동적으로 삭제됨.
      • 대부분의 컨수머는 비교적 짧은 시간 단위 – 매일, 시간별, 실시간 – 내에 소비를 마치기 때문에 문제가 없음.
      • 카프카는 데이터가 크기에 따라서 성능이 저하되지 않으므로 이러한 방법이 가능.
  • 부가적인 이점으로, 컨수머는 이전의 오프셋으로 되돌아가 메시지를 다시 소비하는 것이 가능해짐.
    • 예를 들어, 컨수머의 로직에 에러가 있을 경우, 에러가 수정된 후에 문제가 된 메시지들을 다시 처리하는 것이 가능.
    • 컨수머가 크래시한다면 flush되지 않은 데이터는 잃어버리겠지만, flush하지 않은 가장 작은 오프셋부터 메시지를 다시 소비하는 것이 가능.
    • 이러한 메커니즘은 push 모델보다 pull 모델에서 훨씬 쉬움.

3.2 Distributed Coordination

  • 각각의 프로듀서는 랜덤하게 선택된 파티션이나 파티셔닝 키와 파티셔닝 함수에 의해 결정된 파티션으로 메시지를 발행할 수 있다.
  • 컨수머 그룹 (consumer group)이라는 개념
    • 토픽의 집합을 consume하는 하나 이상의 컨수머들로 구성.
    • 서로 다른 컨수머 그룹은 독립적으로 구독한 메시지들의 전체 집합을 consume하고, 컨수머 그룹 끼리는 조정?(coordination)이 불필요.
  • 어떤 토픽 내에서 하나의 파티션은 병행성의 최소 단위
    • 주어진 시점에 하나의 파티션으로부터의 모든 메시지는 각 컨수머 그룹 내에서 하나의 컨수머에 의해서만 소비됨.
    • 하나의 파티션을 여러 컨수머가 동시에 소비하도록 했다면 누가 어떤 메시지를 소비할지는 조정해야하는 오버헤드가 발생.
    • 로드를 균형 있게 맞추기 위해서는 파티션의 수는 컨수머의 수보다 더 많아지도록 유지.
  • 시스템을 단순화하기 위해 중앙의 마스터 노드를 가지지 않도록 함.
    • 조정을 위해서 Zookeeper를 사용.
    • Kafka가 ZooKeeper를 사용하는 작업들
      • 브로커와 컨수머의 추가와 제거를 탐지
      • 브로커나 컨수머가 추가/제거될 경우 각 컨수머에게 리밸런스 프로세스를 트리거.
      • 소비 관계를 유지하고, 각 파티션의 소비된 오프셋을 추적
        • 브로커나 컨수머가 시작하면 주키퍼의 브로커 또는 컨수머 레지스트리에 그 정보를 저장.
        • 브로커 레지스트리는 브로커의 호스트 이름과 포트, 토픽과 파티션의 집합을 포함.
        • 컨수머 레지스트리는 컨수머가 속한 컨수머 그룹과, 구독하고 있는 토픽들의 집합을 포함.
        • 각 컨수머 그룹은 소유권 레지스트리와 오프셋 레지스트리와 연관됨.
        • 소유권 레지스트리는 모든 구독된 파티션에 대해 하나의 ZooKeeper 경로를 가지며, 그 값은 현재 그 파티션을 소비하고 있는 컨수머의 ID.
        • 오프셋 레지스트리는 각각의 구독된 파티션에 대해 파티션 내에서 마지막으로 소비된 오프셋을 저장.
      • 브로커 레지스트리, 컨수머 레지스트리, 소유권 레지스트리에 대한 주키퍼 경로들은 ephemeral 경로.
      • 오프셋 레지스트리에 대해서는 persistent 경로.
      • 브로커가 실패하면 그 위의 모든 파티션은 자동적으로 브로커 레지스트리에서 삭제됨.
      • 컨수머의 실패는 컨수머 레지스트리에서 컨수머에 해당하는 항목과 소유권 레지스트리에서 그것이 가진 모든 파티션을 삭제.
    • 각 컨수머는 브로커와 컨수머 레지스트리에 대해 ZooKeeper Watcher를 등록하고 브로커나 컨수머 그룹에서 변화가 일어나면 통지를 받음.
    • 컨수머의 시작이나 컨수머가 브로커/컨수머 변화에 대해 통지를 받으면, 컨수머는 소비할 파티션의 부분집합을 정하기 위해 리밸런스 프로세스를 시작.
      • 브로커와 컨수머 레지스트리를 읽어서 각 구독된 토픽에 대해 사용가용한 파티션의 집합과 그 토픽을 구독하고 있는 컨수머의 집합을 계산.
      • 파티션 집합을 범위로 파티셔닝(range-partition)해서 컨수머의 수 만큼의 청크로 나누고 소유할 청크를 고름.
      • 컨수머가 고른 각 파티션에 대해 소유권 레지스트리에 그 파티션의 새로운 소유자로 스스로를 써넣음.
      • 컨수머는 각 소유한 파티션으로부터 데이터를 당겨오는 쓰레드를 시작. 오프셋 레지스트리에 저장된 오프셋으로부터 읽기 시작.
      • 컨수머는 주기적으로 오프셋 레지스트리에 마지막으로 소비한 오프셋을 업데이트.
    • 그룹 내에 여러 컨수머가 있을 때 각각은 브로커/컨수머 변경에 대해 통지를 받지만, 그 통지가 컨수머마다 살짝 다르게 올 수 있음.
      • 하나의 컨수머가 다른 컨수머에 의해 아직 소유된 파티션에 대한 소유권을 가지려고 하는 것이 가능.
      • 이 경우, 첫번째 컨수머는 단순히 모든 파티션을 릴리즈하고 조금 기다린 후 리밸런스 프로세스를 재시도한다.
      • 리밸런스 프로세스는 단 몇번의 재시도 만에 안정화됨.
    • 새로운 컨수머 그룹이 만들어졌을 때는 오프셋 레지스트리에는 오프셋이 없지만, API를 사용해서 최소 또는 최대의 오프셋부터 시작할 수 있음.

Delivery Guarantees

  • Kafka는 at-least-once 배달만을 보장함.
    • exactly-once 배달은 보통 2PC를 필요로 하고, 카프카가 대상으로 하는 애플리케이션은 필요로 하지 않음.
    • 보통의 경우, 메시지는 각 컨수머 그룹에 정확히 한번만 배달됨. 하지만, 컨수머 프로세스가 정상적으로 종료되지 않고 죽은 경우, 그 컨수머가 소유하고 있던 파티션을 새롭게 소유하게된 컨수머는 ZooKeeper에 커밋된 마지막 오프셋 후의 메시지들에 대해 중복 메시지를 소비하게 됨.
    • 애플리케이션에 있어서 메시지의 중복이 중요한 문제라면, Kafka가 컨수머에게 돌려주는 오프셋을 이용하거나 메시지 내의 어떤 유일하게 식별 가능한 키를 사용해서 자체적인 중복 방지 로직을 추가해야함.
  • Kafka는 하나의 파티션으로부터의 메시지는 컨수머로 차레대로 전달되는 것을 보장. 하지만 서로 다른 파티션으로부터 오는 메시지의 순서는 보장하지 않음.
  • 로그가 망가지는 것을 피하기 위해 각 메시지에 대해 CRC를 저장.
    • Kafka는 CC를 가진 메시지를 삭제하는 리커버리 프로세스를 실행.
    • 이 CRC는 네트워크 에러를 체크할 수도 있게 해줌.
  • 브로커가 죽으면 그것 내에 저장된 소비되지 않은 메시지는 사용불가능해짐.
    • 미래에는 여러 브로커에 메시지를 중첩해서 저장하는 자체적인 복제를  추가할 예정.

Experimental Results

Producer test

producer_performance

Kafka의 Producer 성능이 좋은 이유를 아래와 같은 이유로 설명하고 있습니다.

  • Kafka 프로듀서는 브로커로부터의 ack을 기다리지 않고 브로커가 다룰 수 있는 만큼 빠르게 메시지를 보냄.
    • 배치 크기가  50일 때는 Kafka 프로듀서는 프로듀서-브로커 사이의 1GB 링크에 포화됨. (saturated)
    • ack이 없으므로 발행된 모든 메시지가 실제로 브로커에 의해 받아진 건지 보장할 수 없음.
    • 로그 데이터에 대해서는 영속성을 처리속도와 트레이드를 하는 것이 좋음.
    • 미래에 영속성이 중요한 데이터에 대한 개선을 계획 중임.
  • 효율적인 스토리지 형식
    • 평균적으로 Kafka에서는 각 메시지는 9바이트의 오버헤드를 가짐. 반면, ActiveMQ에서는 144 바이트.
    • ActiveMQ에서의 오버헤드는 JMS가 요구하는 무거운 메시지 헤더에서 옴.
    • 여러 인덱싱 구조를 유지하기 위한 비용도 존재.
  • 배치 방식은 RPC의 오버헤드를 감추어서 (amortize) 처리속도를 크게 개선함.

 

Consumer Test

consumer_performance

Kafka가 좋은 성능을 보여주는 이유를 다음과 같이 설명하고 있습니다.

  • 효율적인 스토리지 포맷
  • ActiveMQ, RabbitMQ는 모든 메시지의 배달 상태를 유지하기 위해 꾸준히 쓰기가 발생하지만, Kafka의 브로커는 소비를 위한 디스크 쓰기가 발생하지 않음.
  • sendfile API의 사용.

Future Works

Kafka가 미래에 지원할 기능으로 우선 영속성을 위한 자체적인 복제 지원을 들고 있습니다. 비동기-동기 복제를 둘다 지원하려고 하고, 애플리케이션의 여러가지 요구에 따라 적절한 중첩 레벨을 선택할 수 있도록 하려 한다고 합니다. 다른 하나는 스트림 프로세싱입니다. 기본적인 레코드 수를 센다든지, 다른 스트림이나 다른 스토리지의 데이터와 join하는 등의 처리가 가능하게 하려고 합니다. 우리는 현재의 Kafka는 스트림 프로세싱을 지원하지 않고, Kafka와 협동하는 다른 프레임워크에 위임하는 방식으로 이루어지고 있다는 것을 알고 있습니다.

Kafka: a Distributed Messaging System for Log Processing 더 읽기"

Finding a needle in Haystack: Facebook's photo storage

Finding a needle in Haystack: Facebook’s photo storage by Doug Beaver, Sanjeev Kumar, Harry C. Li , Jason Sobel , Peter Vajgel , Facebook Inc, 2010.

최근 몇개월 동안  상대적으로 크기가 작고 많은 파일들을 비교적 단순한 솔루션을 이용해서 효율적으로 저장하는 것에 대해서 관심이 있어서 읽어본 페이퍼이다. 파일들의 개수로 인한 메모리 사용량과 복제 시간 등이 골칫거리라서 커다란 파일에 저장하는 방법을 여러가지로 검토하고 있었는데, 매우 실용적이고 깔끔한 접근으로 문제를 해결하고 있고, 실제로 개발 비용도 적게 (몇개월 정도 걸렸다고 한다) 들었으므로, 2009년 경의 시스템이지만, 서비스를 다루는 엔지니어로서는 배울 점이 있다고 생각한다.  이미지 파일 자체가 불변 데이터이므로 상대적으로 쉬운 문제인 것은 맞지만, 실제로 서비스에 적용하는 것은 또 다른 문제이다. 오픈소스화 되어 있지 않은 것이 아쉬운 점이라고 할 수 있는데 – 실리콘 밸리의 큰 회사들 치고는 의외로 Facebook은 오픈소스에 그렇게 열려있지 않은 문화인 것 같다 –  구현이 비교적 단순하기 때문인지 수많은 Haystack Clone들이 있다. 혹시 좋은 구현을 찾게되면 소개할 기회가 있을지도 모르겠다.

Problem – NFS Approach

Facebook은 NAS와 NFS를 이용하는 사진 스토리지를 구축하고 있었으며 다음과 같은 문제를 인식하고 있었다.

  • 사진을 저장할 때 전통적인 POSIX 파일시스템에서 요구하는 파일 메타데이터는 필요하지 않음.
  • 사진 파일을 읽기 위해서 3번의 디스크 오퍼레이션이 필요하고 메타데이터의 액세스가 병목이 됨.
    • 디렉토리의 blockmap이 너무 커서 효과적으로 캐시되지 않음.
  • Hot한 액세스는 CDN으로 다룰 수 있지만, long tail 성격을 가진 액세스가 있으므로, 캐싱만으로 해결할 수 없음.
  • 대체 기술들도 적합하지 않음.
    • MySQL, Hadoop
    • RAM을 늘리는 것만으로는 비효율적.

Design & Implementation

Haystack은 다음과 같은 목표를 달성해서 NFS approach의 bottleneck을 줄인다.

  • 실제 데이터를 읽기 위한 디스크 오퍼레이션을 단 1번만 필요하도록 한다.
  • 메타데이터를 위한 메모리 사용량을 줄임.

기본적인 접근 방식은 여러 사진을 하나의 커다란 파일에 저장하는 방식을 사용하는 것이다.

전체적인 시스템 구성은 Haystack Store, Haystack Directory, Haystack Cache의 3개의 서브시스템으로 이루어져 있다.

  • Store
    • Persistent storage로 파일시스템 메타데이터를 관리하는 컴포넌트.
    • Logical volume은 서로 다른 장비의 Physical volume들로 구성됨.
  • Directory
    • Logical Volume으로부터 Physical volume으로의 맵핑.
    • 각 사진들이 있는 Logical volume이나 여유공간이 있는 Logical volume을 관리한다.
  • Cache
    • 내부적인 CDN과 같은 역할.

웹서버는 디렉토리를 이용해서 URL을 구성하는데, 특이한 것은 사진의 URL에 Logical volume이나 장비에 대한 힌트 등 사진을 가져오기 위한 정보가 들어가 있다는 점이다.

  • URL은 CDN host/Cache/Machine ID/Logical volume, Photo와 같은 형태인데, CDN 또는 Haystack Cache는 (Logical volume, Photo) 정보만을 이용해서 자신의 캐시로부터 사진을 찾는다.
  • CDN이나 Cache로부터 miss일 경우에는 Cache address를 URL로부터 제거하고 Store 장비에 사진을 요청한다.

Haystack Directory

Haystack Directory는 다음과 같은 4가지 기능을 가지고 있다.

  • Logical volume을 Physical volume으로 맵핑
    • 어떤 Store 장비가 소실된 상황에서는 mapping에 상응하는 엔트리를 삭제하고 온라인이 된 새로운 머신으로 대체.
  • 쓰기 액세스를 여러 로지컬 볼륨에 대해 로드 밸런싱, 읽기 액세스를 여러 피지컬 볼륨에 대해 로드밸런싱
  • 리퀘스트가 CDN으로 핸들링되어야할지 Cache로 핸들링되어야할지 결정
  • 로지컬 볼륨이 읽기전용인지 아닌지에 대한 판단. (운영적인 이유나 용량 한계)
    • 새로운 장비를 추가하면 이 장비는 쓰기가 허용되고, 쓰기가 허용된 장비만 사진의 업로드를 받는다.

Haystack Directory는 복제가 되는 데이터베이스에 정보들을 저장하고 PHP interface로 액세스되며, latency를 줄이기 위해 memcache를 사용한다고 한다.

Haystack Cache

Haystack Cache는 Photo ID를 키로 데이터를 찾을 수 있는 Distributed hash table로 구성되어 있다. 어떤 DHT를 사용하고 있는지에 대해서는 자세히 설명되어 있지 않다. 캐시되어있지 않다면, URL에 명시되어 있는 Store 장비에서 사진을 가져온다. 캐시하는 조건이 조금 특이한데 다음과 같다.

  • 사용자로부터 직접 온 리퀘스트이고 CDN이 아닐 때: CDN에서 miss된 것이 내부 캐시에서 hit될 가능성이 낮음.
  • 쓰기가 허용된 Store 장비로부터 사진을 가져왔을 때: 사진은 업로드된 뒤에 헤비하게 액세스되므로 쓰기가 허용된 장비를 과도한 읽기 액세스로부터 보호.

Haystack Store

  • 전반적인 구조
    • 각 Store 장비는 여러 개의 Physical volume을 관리하고, 각 volume은 수백만 개의 사진을 저장한다.
      • 이를테면, Physical volume은 100GB이고, /hay/haystack…<logical volume id>와 같이 저장됨.
    • Store 장비는 Logical volume의 ID와 file offset을 이용해서 사진을 액세스할 수 있다.
    • Store 장비는 각 Physical volume에 대해 file descriptor를 유지한다.
    • Physical volume은 수퍼블럭과 needle의 sequence로 이루어져 있음.
    • needle을 빠르게 가져오기 위해서 각 볼륨에 대해 In-memory data structure를 유지하고 있음.
      • (key, alternative key) -> (needle’s flags, size in bytes, volume offset)
      • alternative key는 4개의 서로 다른 크기의 사진을 나타냄.
  • 읽기 (Photo Read)
    • 읽기 요청에는 Logical volume ID, Key, Alternative key, Cookie가 있다.
    • 사진이 업로드될 때 cookie는 랜덤하게 생성되어 Directory에 저장된다. 사진의 URL을 추측해서 액세스하는 공격을 막아준다.
    • In-memory 맵핑에서 관련된 메타데이터를 찾고, 지워진 사진이 아니라면 적절한 offset을 seek해서 전체 needle을 읽어들임.
    • 읽어들인 needle로부터 cookie를 검증하고 데이터의 integrity를 체크 후 문제가 없다면 응답.
  • 쓰기 (Photo Write)
    • 쓰기 요청에는 Logical volume id, Key, Alternate key, Cookie, Data가 있다.
    • Physical volume file에 needle image를 동기적으로 append하고, In-memory 맵핑을 업데이트 함.
    • 이미 존재하는 사진의 업데이트는 동일한 key/alternate key로 업데이트된 needle을 append하는 것으로 이루어진다.
      • 새로운 needle이 다른 Logical volume에 쓰여진다면, Directory는 애플리케이션 메타데이터를 업데이트하므로 미래의 읽기 요청은 기존 버전을 읽지 않음.
      • 새로운 needle이 같은 Logical volume에 쓰여진다면, 새로운 needle을 append함.
      • 업데이트로 인해 중복된 needle이 발생할 수 있는데, 하나의 Physical volume에서 높은 offset의 needle이 최신 버전의 needle이다.
  • 지우기 (Photo Delete)
    • In-memory 맵핑과 Volume file에 삭제 플래그를 설정.
    • 삭제된 사진에 대한 읽기 요청은 먼저 In-memory 맵핑의 플래그를 체크하고, 삭제 플래그가 설정되어 있다면 에러.
  • 인덱스 파일
    • 이론적으로는 모든 Physical volume을 읽어서 In-memory 맵핑을 복구할 수 있지만, 꽤 시간이 걸리는 일이므로, 인덱스 파일은 In-memory 맵핑을 빨리 빌드하게 도와주어 Store의 재기동 시간을 줄여줌.
    • 각각의 Physical volume에 대해 인덱스 파일을 유지하는데, In-memory 데이터 구조의 체크포인트라고 생각할 수 있다.
    • 인덱스 파일의 구조는 Physical volume file과 유사하게 수퍼블럭과 인덱스 레코드들의 시퀸스인데, Physical volume file에 나타나는 needle과 동일한 순서임.
    • 사진을 추가할 때 needle은 Physical volume file에 동기적으로 쓰지만, 인덱스 레코드는 비동기적으로 쓴다.
    • 사진을 삭제할 때 Physical volume file의 needle에는 삭제 플래그를 설정하지만, 인덱스 파일은 업데이트하지 않는다.
      • 추가적인 동기적인 디스크 쓰기를 피하고, 쓰기나 삭제가 좀 더 빠르게 끝나도록 해준다.
    • 인덱스 레코드가 없는 needle (orphan)이 발생할 수 있음.
      • 재기동을 할 때 orphan에 해당하는 인덱스 레코드의 추가 작업을 수행.
      • 인덱스의 마지막 레코드는 볼륨 파일의 마지막 orphan이 아닌 needle임.
      • 재기동이 완료되면 인덱스 파일만을 사용해서 In-memory 맵핑을 초기화.
    • 삭제된 사진이지만 인덱스 레코드는 이를 반영하지 않는 경우가 발생할 수 있음.
      • 삭제된 사진을 Physical volume으로부터 가져오더라도 전체 needle을 읽으면 삭제 플래그를 검사할 수 있으므로, 그 때 In-memory 맵핑을 업데이트하고 캐시에 오브젝트가 없다고 응답.
  • 파일시스템
    • 현재는 Extent 기반의 파일시스템인 XFS를 사용.
      • 커다란 파일의 블록맵이 메인메모리에 저장되기 충분할 정도로 작다.
      • 효율적인 파일 preallocation을 제공하므로, fragmentation을 완화하고 블록맵이 커지는 것을 막아준다.
    • XFS를 사용해서 파일 시스템 메타데이터를 읽기 위한 디스크 오퍼레이션을 제거할 수 있다.
    • 예외적으로는 사진 데이터가 extent나 RAID stripe 가장자리에 걸쳐져있는 경우 1번 이상의 디스크 오퍼레이션을 필요로 한다.
      • Haystack은 1GB extent를 preallocation하고 256KB RAID stripe size를 사용하므로 실제로는 그런 경우는 드물다.

3.5. Recovery from failures

  • Detection
    • 적극적으로 문제가 있는 Store 장비를 찾기 위해서, 피치포크(pitchfork)라는 주기적으로 Store 장비의 상태를 체크하는 백그라운드 작업을 실행한다.
    • 각 Store 장비로의 접속을 원격에서 체크하고, Volume file의 가용성과 실제로 데이터를 읽는 것이 가능한지 체크한다.
    • 어떤 Store 장비가 일관되게 체크에 실패한다면  피치포크는 자동적으로 그 Store 장비에 있는 모든 Logical volume을 read-only로 표시한다.
  • Repair
    • 한달에 몇 번 정도 전체 동기화 (bulk sync)를 필요로 하는 상황이 있는데, Store 장비의 데이터를 복제본의 Volume file을 이용해 리셋해야하는 경우다.
    • 전체 동기화되는 데이터의 양은 각 Store 장비의 NIC 속도보다 order of magnitude로 크기 때문에 복구까지 몇시간이 걸릴 수 있다.
      • (나의 계산: 1Gbit하에서 10MB/s (10%)를 쓸 수 있는 상황이라면 10분 내에 복구하려면 10MB x 600sec = 6TB 정도. 1MB x 600 sec이라면 600GB.)

3.6. Optimizations

3.6.1 Compaction

  • 삭제되거나 업데이트에 의해 중복된 needle에 의해 사용되는 공간을 되찾기 위한 과정으로, 중복되거나 삭제된 엔트리를 skip하면서 needle을 새로운 파일로 복사함으로써 Volume file을 compaction한다.
  • Compaction 과정 중에 발생하는 삭제는 원래의 Volume file과 새로 빌드하는 Volume file 양쪽으로 적용된다.
  • 이 과정이 파일의 끝까지 진행되면 추가적인 수정을 블록하고 파일과 In-memory 맵핑을 atomic하게 swap한다.

3.6.2 Saving more memory

  • 플래그는 삭제 용도로만 사용하고 있기 때문에, In-memory record의 offset을 0으로 설정함으로써 삭제를 표현하고 플래그 필드는 제거.
  • 메인 메모리에는 쿠키를 저장하지 않고, 디스크로부터 니들을 읽은 후에만 쿠키를 체크한다.
  • 현재는 사진 당 10 bytes의 메모리를 사용함.
    • 하나의 이미지에 대해 4개의 사이즈를 유지하므로, 키 (8 bytes) + alternate key (4 byte) x 4 + data sizes (2 bytes) x 4 = 32 bytes
    • 해시 테이블에 의해 이미지당 2 bytes의 오버헤드.
    • 리눅스의 xfs_inode_t는 536 bytes임.

3.6.3 Batch upload

  • 디스크는 커다란 sequential write일 경우 성능이 좋으므로, 가급적이면 batch upload를 하려고 노력함.
  • 사용자의 앨범 업로드

4. Evaluations

4.1. Characterizing photo requests

  • 사용자는 매일 수백만개의 사진을 업로드함.
  • 옛날 것보다 최근에 업로드된 사진이 더 POPULAR함.

4.1.1. Features that drive photo requests

News Feed와 album이 98%의 Facebook request임.

  • 빠른 popularity 감소는 CDN/Cache가 popular content의 호스팅에 매우 효율적일 것임
  • Longtail 그래프는 무시할 수 없는 수의 리퀘스트가 캐시로 다룰 수 없음을 의미

4.1.2 Traffic Volume

  • Daily Uploaded: ~120 M
  • Haystack Photo WrittenL ~1.44 B (12배 = 4 sizes, 3 locations)
  • Photo Viewed: 80-100 B
  • Haystack Photo Read: 10 B

4.2. Haystack Directory

Hashing policy는 read-write를 잘 분산하고 있음.

4.3. Haystack Cache

80% 정도의 hit rates

4.4. Haystack Store

4.4.1 Experimental Setup

  • 2U storage blade
  • 2 hyper-threaded quad-core Intel Xeon CPUs
  • 48GB memory
  • hardware raid controller with 256-512MB NVRAM
  • 12 x 1 TB SATA drives
  • RAID-6 partition
    • 9TB of capacity
    • 적절한 redundancy와 뛰어난 읽기 성능, 낮은 스토리지 비용.
    • NVRAM write-back cache가 RAID-6의 write performance 저하를 완화
    • Store 장비에서의 캐싱은 비효율적이므로 NVRAM은 쓰기 용도로만 사용
  • Crash나 Power loss 시의 data consistency를 위해서 disk cache(?)는 disable.

4.4.2 Benchmark Performance

  • Randomio
    • random 64KB reads, direct I/O (sector aligned requests)
    • 읽기 throughput의 베이스라인 설정.
  • Haystress
    • buffer cache의 영향을 줄이기 위해 커다란 이미지 셋에 대한 랜덤 읽기를 사용.
    • Workload A randoms reads to 64KB on Store machine with 201 volumes
      • 770.6 qps, 85% of the raw throughput / 17% higher latency
  • 오버헤드의 원인
    • Haystack은 파일시스템 위에서 동작.
    • 디스크 읽기는 전체 needle을 읽기 위해 필요한 64KB보다 큼.
    • RAID-6 device stripe size에 align되지 않았을 가능성이 있으나, 적은 확률일 것임.
    • 인덱스 액세스, checksum 계산 등으로 인한 CPU 오버헤드.

4.4.3 Production workload

  • Multi-write latencies are very flat and stable
    • NVRAM allows us to write needles async and issue a single fsync to flush the volume file once the multi-write is complete
  • Read performance impacted by
    • number of photos stored on the machine
    • cached in the Cache (for read-only, buffer cache would be more effective)
    • recently written photos are usually read back immediately
  • CPU idle time varies 92-96%

Finding a needle in Haystack: Facebook's photo storage 더 읽기"

Timestamps in Message-Passing Systems That Preserve the Partial Ordering

Fidge, C.J., Timestamps in Message-Passing Systems that Preserve the Partial Ordering, Proc. 11th Australian Comp. Sci. Conf., 1988, pp. 56-66.

Timestamping is a common method of totally ordering events in concurrent programs. However, for applications requiring access to the global state, a total ordering is inappropriate. This paper presents algorithms for timestamping events in both synchronous and asynchronous message-passing programs that allow for access to the partial ordering inherent in a parallel system. The algorithms do not change the communication graph or require a central timestamp issuing authority.

이 논문은 Vector Clock으로 알려진 개념을 소개하고 있다. Vector Clock은 [Fidge 1998]과 [Mattern 1998]에서 독립적으로 고안된 알고리즘인데, Fidge의 논문쪽이 좀 더 읽기 쉬운 편이라고 한다.

Limitation of Lamport’s logical clock

Clock Condition. For any events a, b:
if a → b then C(a) < C(b).

Lamport가 정의한 논리적 시계는 두 사건 사이의 happened-before 관계가 있다면 이를 반영하는 시계를 정의하고 있습니다. 그리고 이를 이용해 임의의 완전순서를 정의하고 있습니다. 이 완전순서는 임의의 프로세스 사이의 순서를 이용하고 있으므로 사건들을 정렬할 수 있는 방법 중의 임의적인 한 방법이고, 논리적 시계 이상의 어떤 정보가 추가되지는 않습니다.

Lamport가 정의한 논리적 시계의 문제는 이 논리적 시계를 통해서 사건들 사이의 인과성 (causality)를 알아낼 수 없다는 것입니다. 즉, C(a) < C(b)라고 해서 a  b라고 말할 수 없는 것입니다. 구체적으로 말해, C(a) < C(b)가 의미할 수 있는 것은 a와 b 사이에 happend-before 관계가 있는 것, 즉  a  b인 경우와, a와 b 사이에 happend-before 관계를 판단할 수 없고 b가 a에 대해 happened-before가 아닌 것, 즉 b ↛ a인 경우가 있습니다.

Vector Clock

Vector Clock이라는 단어를 논문에서 사용하고 있지는 않지만, 그 단어가 의미하듯이 논리적 시계의 타임스탬프를 integer가 아니라 다음과 같이 프로세스 수 만큼의 타임스탬프를 가진 배열로 대체하고 있습니다.

[c1, c2 … cn]

그리고 이 타임스탬프를 유지하기 위한 알고리즘은 아래와 같습니다. 타임스탬프가 배열이 되었다는 것 이외에 Lamport의 알고리즘과 큰 차이가 없다고도 얘기할 수 있겠지만, 주의를 기울여야 하는 부분은 메시지를 수신하는 프로세스에서는 메시지를 송신한 프로세스에 해당하는 타임스탬프 배열 항목의 값만을 증가시킨다는 것입니다.

Rule RA 1: Initially all values are zero.

Rule RA 2: The local clock value is incremented at least once before each atomic event

Rule RA 3: The current value of entire timestamp array is piggybacked on every outgoing signal

Rule RA 4: Upon receiving a signal, a process sets the value of each entry in the timestamp array to be the maximum of the two corresponding values in the local array, and in the piggybacked array received. The value corresponding to the sender, however, is a special case and is set to be one greater than the value received (to allow for transit time), but only if the local value is not already greater than that received (to allow for signal “overtaking” as described below), i.e.

q?other_array; /* receive timestamp array from process q */
if local_array[q] <= other_array[q] then
    local_array[q] := 1 + other_array[q];
for i := 1 to n do
    local_array[i] := max(local_array[i], other_array[i]);

Rule RA 5: Values in the timestamp arrays are never decremented.

이 논문에서는 설명하고 있지 않지만, 이러한 알고리즘으로 얻어지는 타임스탬프의 의미는 수신한 메시지에 근거해서 추측할 수 있는 가장 정확한 송신 측의 타임스탬프라는 것입니다. 또한, 수신한 프로세스에서 발생할 현재의 사건에 대해 happened-before 관계에 있는 송신 측의 프로세스의 가장 마지막 사건의 타임스탬프이기도 합니다.

이러한 타임스탬프 배열은 다음과 같이 비교될 수 있습니다.

각각 프로세스 p, q에서 실행된 사건 e, f를 ep, fq로 나타내고, 각 사건에 대한 타임스탬프 array를 Tep, Tfq, 그 타임스탬프 배열 중 프로세스 p에 해당하는 타임스탬프를 Tep[p], Tfq[p] 라고 할 때,

ep → fq iff Tep[p] < Tfq[p]

여기서 주목할 점은 iff 즉, 사건의 happened-before 관계와 타임스탬프의 대소 관계가 동치관계라는 것입니다. 임의의 두 사건이 주어졌을 때 인과성이 있는지 여부와 어떤 사건이 어떤 사건에 대해 인과성이 있는지를 판단할 수 있습니다.

ep ↔ fq if Tep[p] ≮ Tfq[p] and Tfq[p] ≮ Tep[p]

타임스탬프 사이의 대소 관계가 어느 방향으로도 성립하지 않는다면 두 사건은 동시적(concurrent)하다고 얘기할 수 있습니다.

Applications

그렇다면 논리적 시계를 통해서 인과성을 알아내는 것은 왜 필요한가에 대해서 이 논문이 예로 들고 있는 것 중 하나는 여러 프로세스들에 의해서 저장되는 상태의 스냅샷들에 타임스탬프를 이용해서, 전체 프로그램 상태에 대해 정상적인 상태인지 검사를 할 수 있고 이를 통해 역실행 (reverse execution), 에러의 복구, 롤백 등이 가능하다는 것입니다. 실제로 이후에 발표된 Dynamo나 Riak 등 Vector Clock을 이용하는 것으로 알려진 시스템에서는 동시적인 쓰기 등에 의해 어떤 데이터에 대해 2개 이상의 복제본이 발생했을 때 이를 해소하기 위해서 인과성을 활용하고 있는 사례를 볼 수 있습니다.

References

  • [Fidge 1998] Fidge, C.J., Timestamps in Message-Passing Systems that Preserve the Partial Ordering, Proc. 11th Australian Comp. Sci. Conf., 1988, pp. 56-66.
  • [Mattern 1998]  Mattern, F. Virtual time and global states of distributed systems. Proc. “Parallel and distributed algorithms” Conf., (Cosnard, Quinton, Raynal, Robert Eds), North-Holland, 1988, pp. 215-226.

Timestamps in Message-Passing Systems That Preserve the Partial Ordering 더 읽기"

Time, Clocks, and the Ordering of Events in a Distributed System

Leslie Lamport. 1978. Time, clocks, and the ordering of events in a distributed system.Commun. ACM 21, 7 (July 1978), 558-565.

The concept of one event happening before another in a distributed system is examined, and is shown to define a partial ordering of the events. A distributed algorithm is given for synchronizing a system of logical clocks which can be used to totally order the events. The use of the total ordering is illustrated with a method for solving synchronization problems. The algorithm is then specialized for synchronizing physical clocks, and a bound is derived on how far out of synchrony the clocks can become.

이 논문에 대해서는 Paxos Made Simple에서 이미 아래와 같은 소개를 한 적이 있습니다.

Leslie Lamport는 분산 컴퓨팅 분야에서는 너무나 유명한 분이기 때문에 따로 설명할 필요가 없을 정도입니다. 예를 들어, 1978년에 출판된 “Time, Clocks, and the Ordering of Events in a Distributed System”과 같은 페이퍼는 인용 회수로 볼 수 있는 그 영향력 뿐만 아니라 OS 수업의 읽기 과제로도 빠질 수 없는 그야말로 seminal work입니다.

1. “Happened before” 관계

가장 먼저, “Happened before” 관계라는 분산시스템 내에서 사건(event)들의 순서를 표현할 수 있는 부분순서(partial ordering)를 제안하고 있습니다.

“Happened before” relation은 분산시스템에서 구현이 어려운 물리적 시계 (physical clock)를 사용하지 않더라도 사건들의 순서를 표현할 수 있음을 보여주고 있습니다. 즉, 사건이 발생한 전역적으로 통용되는 시각을 알지 못해도 우리는 자연스럽게 사건의 순서를 정할 수 있습니다.

또한, 서로 다른 프로세스 사이에서 메시지를 주고 받을 경우에 사건 사이의 인과성(causality)이 발생하는 것을 다음과 같은 정의에 담고 있습니다.

Definition. The relation → on the set of events of a system is the smallest relation satisfying the following
three conditions:

  1. If a and b are events in the same process, and a comes before b, then a → b.
  2. If a is the sending of a message by one process and b is the receipt of the same message by another process, then a → b.
  3. If a → b and b → c then a → c.

Two distinct events a and b are concurrent if a ↛ b and b ↛ a. Assume a ↛ a for any event a. So → is an irreflexive partial ordering on the set of all events in the system.

2. Logical Clocks

논리적 시계는 하나의 프로세스 내의 각 사건들에 대해 단조 증가하는 숫자를 할당하는 것으로 볼 수 있으며, 물리적 시계가 아니라 사건이 발생한 순서에 기초해야 합니다.

Clock Condition. For any events a, b:
if a → b then C(a) < C(b).

“Happened before” 관계의 정의에 따라, 각 프로세스는 사건이 발생할 때마다 증가하는 타임스탬프를 부여하며, 프로세스 사이에 메시지를 송신할 때 타임스탬프를 함께 송신하며, 메시지를 수신하는 측에서는 수신한 타임스탬프보다 더 큰 타임스탬프를 새로운 사건에 할당합니다.

3. Ordering the Events Totally

위에서 정의된 논리적 시계만으로는 분산시스템 내의 사건들의 완전순서(total ordering)는 불가능하기 때문에, 프로세스 사이의 임의의 순서(!)를 도입해, 완전순서를 만들 수 있습니다.

In case two or more events occur at the same time, an arbitrary total ordering ≺ of processes is used. To do this, the relation ⇒ is defined as follows:

If a is an event in process Pi and b is an event in process Pj , then a ⇒ b if and only if either:
i. Ci<a> < Cj<b> or
ii. Ci<a> = Cj<b> and Pi ≺ Pj

이렇게 정의된 완전순서를 이용해 분산시스템에서의 상호배제(mutual exclusion) 문제를 해결하는 방법을 제시하고 있습니다.

간단히 얘기하면, 어떤 프로세스가 자원을 요청할 때는 자신의 타임스탬프를 함께 다른 모든(!) 프로세스들에게 전송하고, 이 자원의 요청은 요청한 프로세스 뿐만 아니라 모든 프로세스의 큐에 위에서 정의된 완전순서대로 정렬됩니다. 이 큐에서 가장 앞에 있는 자원 요청이 자원을 획득해야한다고 볼 수 있는데, 모든 프로세스들이 동일한 큐를 유지해야하기 때문에, 자원 획득을 위해서는 모든 다른 프로세스들로부터 receipt 메시지를 수신한 조건에서만 자원 획득이 가능합니다. 또한, 다른 프로세스의 자원 요청에 대한 응답과 자신의 자원 요청의 (타임스탬프의 순서는 올바르다고 하더라도) 전송 순서가 바뀌는 경우는 두 프로세스 사이의 통신은 순서대로 일어난다는 가정에 의해 발생하지 않습니다.

이 구현은 1개 이상의 프로세스의 실패나 메시지의 소실 등을 가정하고 있지 않기 때문에 현실적으로 사용할 수 있는 알고리즘은 아니지만, 분산시스템에서의 완전순서의 유용성을 보여주고 있다고 생각합니다. 그리고, Lamport가 이후에 제안한 Paxos가 바로 현실적인 환경 하에서의 완전순서를 보장하기 위한 방법을 제공하고 있는 것으로 보입니다.

4. Anomalous Behavior

위에서 제시한 상호배제 알고리즘에서 사건 사이의 인과성이 외부화됨으로써 발생할 수 있는 문제를 제시하고 있습니다. 예를 들어, 자원 요청 A를 한 후, 전화를 걸어 다른 컴퓨터에서 자원 요청 B를 한 경우, 자원 요청 A와 자원 요청 B 사이에는 실제로 인과성이 존재하지만, 논리적 시계의 체계는 이를 인지하지 못하므로, 자원 요청 B가 더 낮은 타임스탬프를 획득해 먼저 자원을 획득할 수 있다는 것입니다.

이를 해결하기 위한 방법으로 2가지 방법을 제시하고 있는데, 첫번째는 “happened-before” 관계에 필요하지만 외부화된 정보를 시스템 내로 도입하는 것입니다. 즉, 자원 요청 B를 하는 사용자에게 자원 요청 A 보다 더 이후의 타임스탬프를 발급하도록 자원요청 A의 타임스탬프를 전화를 통해 알려주는 방법 등으로 논리적 시계 체계를 유지하는 책임을 부여하는 것입니다.

다른 방법은 외부화된 사건들 사이의 관계를 포함해 모든 사건들을 정렬할 수 있는 물리적인 시계의 체계를 구성하는 것입니다. 이 시계들은 다음의 Strong Clock Condition을 만족해야 합니다.

Let S be the set of all system events and S be the set containing S along with relevant events external
to the system. Let ↪ denote the “happened before” relation for S.

For any events a, b in S: if a ↪ b then C<a> < C<b>.

5. Physical Clocks

Strong Clock Condition을 만족하기 위해서는 물리적 시계는 다음과 같은 조건을 만족해야합니다.

Let Ci(t) denote the reading of clock Ci at physical time t. Assume that Ci(t) is a continuous, differentiable
function of t except for isolated discontinuities introduced by clock resets.
PC1. There exists a constant k << 1 such that for all i: |dCi(t) / dt – 1| < k.
PC2. For all i, j : |Ci(t) – Cj(t)| < e.

PC1은 물리적 시계가 가는 속도가 물리적 시간이 흐르는 속도와 일정 오차 범위 내에 있다는 것을 의미합니다. 일반적인 수정 발진 방식의 시계의 경우, k는 약 10-6정도로 언급하고 있으며, PC1 자체는 만족되는 것으로 가정하고 있습니다.

한편, PC2는 물리적 시계들 사이의 오차가 일정 범위 내에 있다는 것을 의미합니다. PC2를 보장하기 위해서 물리적 시계의 보정을 위한 구현 방법을 제시하고 있습니다.

Let vm = t’ – t be the total delay of m which is unknown to the receiving process. Let µm be some minimum delay >= 0 known by the receiving process such that µm <= vm.

IR2′. a. If Pi sends a message m at physical time t, then m contains a timestamp Tm = Ci(t).
b. Upon receiving a message m at time t’, process Pj sets Cj(t’) equal to max(Cj(t’ – 0), Tm +
µm).

메시지에 타임스탬프를 넣어서 보내고, 이를 수신하는 측에서는 메시지를 통해 받은 타임스탬프와 최소 통신 지연 시간 이후의 시각으로 시계를 재설정하는 방식이라고 할 수 있고, 이러한 구현 요구사항을 만족한다면, PC2를 만족하는 것을 증명할 수 있다고 합니다. 이 증명에서는 일정 시간 내에 메시지가 전체 프로세스 사이로 전송되는 상황을 가정하고 있으므로, 메시지를 주고 받지 않는 조건을 염두에 두고 있는 것 같지는 않습니다.

한편, 최소 지연시간에 해당하는 µm은 일반적으로 거리와 빛의 속도로 계산할 수 있는 수준의 값으로 언급되고 있습니다. 그리고, 주어진 µm에 대해 k와 e가 얼마나 작아야 하는지에 대한 계산 방법도 설명하고 있습니다.

현실적으로는 이 논문에서 정의된 Physical Clock은 항상 가장 빠른 시계에 맞춰지게 되므로 실제 시간 (physical time)보다 더 빨라지게 되겠지만, 분산시스템의 동기화 문제를 해결하는 능력과는 무관하다고 할 수 있을 것 같습니다.

6. Closing

이 논문은 분산시스템의 문제에 대한 이해에 가장 중요한 사고 도구로 사용할 수 있는 중요한 개념들을 정의하고 있는 것 같습니다. 내용 자체를 정확하게 이해했는지의 여부를 차치하더라도, 이러한 개념이 마음에 와닿기 위해서는 몇 번은 더 읽어봐야 할 것 같습니다.

다음에는 Vector Clock에 관한 논문인 C. Fidge의 Timestamps in Message-Passing Systems That Preserve the Partial Ordering을 읽어볼 예정입니다.

Time, Clocks, and the Ordering of Events in a Distributed System 더 읽기"

Actors in Scala

 

Actors in ScalaActors in Scala by Philipp Haller and Frank Sommers

150페이지 남짓의 이 책은 Scala의 Actors 라이브러리에 대한 책이다. 이 책의 저자 중 한명인 Philipp Haller는 Scala Actors 라이브러리의 저자다.

이 책은 Scala Actors를 다루는 것에 대한 기본적인 내용과 Exception/Termination handling, Actor 실행 방법의 customization, Remote Actors를 상세하게 다루고 있다. 원래는 Actor를 사용하는 패턴도 기대하고 있었지만 MapReduce와 같은 몇가지 예시 정도 외에 좀 더 완전한 내용의 것은 찾을 수 없었다. 하지만, 기본적인 Actors 라이브러리의 기본적인 개념을 익히는데에는 좋은 책일 듯 하다.

한가지 문제라면, Scala 2.10 부터는 Actors 라이브러리가 deprecated 되었고, Akka로 대체되었다는 것이다. Scala Actors의 자세한 히스토리는 잘 모르지만, Akka는 Scala Actors와 커다란 차이는 없는 반면 더욱 일관성이 있는 느낌이 든다. Scala Actors와 Akka의 차이에 대한 사항은 Scala Actors Migration Guide를 참고하면 좋을 듯 하다.

Akka는 Java와 Scala 모두 지원하고 있기 때문에, 기존에 Java 위주의 프로젝트를 진행하고 있었지만 Scala 프로젝트도 함께 진행하기 위해 연동해야 하거나 Scala 프로젝트로 전환하는 경우의 통합 수단으로서, 기존의 language-agonostic RPC들, ZeroMQ 등의 대안으로 좋은 선택이 될 수 있지 않을까 생각해보고 있다.

Actors in Scala 더 읽기"