Ensuring Data Consistency for Continuously-modified Data (ft. Topological Sorting)

Christine Baek
Mixpanel Engineering
9 min readOct 20, 2023

--

At Mixpanel, data trust and reliability are of utmost importance. In order to safely roll out code and infrastructure changes, we maintain two independent zones of ingestion infrastructure per region. However, having two zones is not sufficient to ensure safety and we also need a way to detect changes that cause data corruption. Previously, we have written about ensuring data consistency across replicas for real-time data. This post is a follow-up that covers the work the storage infrastructure team at Mixpanel has done to guarantee that all write and modification operations on the data we store are checked for cross-replica consistency.

The high-level architecture of our data system

blue boxes represent services, and pink boxes represent databases

Above is a simplified diagram illustrating the different write (insert or update) operations for Mixpanel’s distributed, multi-tenant database, known as ‘ARB’. We replicate the above setup in two clusters in different zones, per each region (US and EU). Each zone is designed to ingest, transform, and store customer data independently from the other zone.

Actual data that is sent in by the customer, and used for analysis within Mixpanel, is transformed by our ingestion pipelines, pushed to Kafka, then pulled off of Kafka by our Tailers services and stored in GCS long-term. Manifester keeps an index of all files belonging to each customer project and updates metadata such as the start and end Kafka offsets associated with each file. Files created from these tailing operations are periodically closed and compacted to switch from an append-only file, optimized for writes (for real-time data availability) to a columnar format, which is a more query-friendly file format, optimized for reads. Multiples of such files are created, and these too get compressed into a single file on a regular cadence (via process labeled as ‘multi-compaction’ above), for query performance and storage efficiency.

A quick recap of the prior project (checking for consistency of real-time data)

The previous blog post covered the case of ensuring the consistency of data that was ingested in real-time, as highlighted above. The main challenge for this effort was designing and implementing a deterministic file cutover strategy for streaming data that has to be available in real-time. While the previous blog post goes into detail about the intricacies that were required to make this a reality, the basic premise is that the comparability of the two files cross-zone is guaranteed by ensuring that the start and end offsets for each file are consistent and deterministic. Because of Kafka’s in-order guarantee within a partition, 2 files containing messages with the same start and end offsets should contain the same set of events in order, making them directly comparable. Their checksums are then compared to ensure that the content of these files has not been corrupted and can be declared consistent. Any file that fails this check would be rejected, and the team would be alerted to investigate further.

Overview of ensuring consistency for non-realtime, existing data

Even after data makes it into GCS through the normal ingestion path, it is highly unlikely that the data is in its final form. Various operations can be done on the existing data, as highlighted in the diagram above. Given that having too many files that need to be scanned can result in degraded query performance, single-pass compacted files are often compressed together for lowered storage costs as well as enhanced query performance. It is also possible for files to be partially deleted for compliance and privacy reasons. Lastly, resharding involves generating a new copy of all existing data and storing it in the new sharding spec. All these operations are adding new files (where the files are derived from existing data) to our proprietary database which is checked for consistency cross-zone at the time of add, and is the main topic of this blog post.

In the case of real-time data, the main challenge for achieving deterministic and identical files (and therefore comparable cross-zones) was establishing discrete start and end points in a continuous stream of new data. This was achieved by relying on elements such as file size and message age. Each file being limited to a single Kafka partition meant the content of the file was guaranteed to be in order, inclusive of all messages between the start and end offset pairs, and all variables that were required for determinism were self-contained (ex: age of the first message in the file). In other words, as long as the start and end offsets were the same, the files themselves could be trusted to be identical and comparable.

In contrast, modification and combining of existing data has a different set of challenges it needs to overcome to guarantee identity. In these cases, data from multiple partitions are grouped together, or new files are created from a subset of existing files. Previously, it was guaranteed that a file with a given start and end offset pair would contain all events belonging to those offsets and all other events in between, but this is an assumption that is no longer reliable. Each type of modification represents a unique challenge with its own solutions to guarantee identity.

Re-shard (copying existing data into other partitions)

Resharding is used to either increase or decrease the number of shards that a project is allotted, to optimize query performance and storage efficiency. It involves reading from source files in the original shard specifications and copying over the data to a new file for the destination shard at a per-distinct-id granularity (Mixpanel shards data based on distinct id). This particular method of data writing is relatively straightforward to implement consistency checks for, because while it is creating a new file with the same start and end offset pairs as the original files only with a subset of data, that output file is for a new destination shard, and is checked for consistency with its new shard, cross-zone counterpart when added to the manifest.

Invariant and variable refer to what remains constant vs is changed between the input and output files

  • Invariant: start and end Kafka offsets (start and end offsets remain the same)
  • Variable: the actual content of data (only a subset of the data is copied to the new file), and the shard that the file belongs to

GDPR (selective deletion of data)

Mixpanel supports selective deletion of data for compliance purposes, which is accomplished by reading an existing file and generating a new file excluding the delete-requested data. The above diagram illustrates the input file, which contains the delete-requested data (marked by a pink crosshatch). The output file post-scrub has the same start and end offsets as the input file but no longer has the same content or checksum value as a result of the scrub. Like reshard, GDPR also takes an existing file and produces a new file containing a subset of the original data but the key difference is that the output file still belongs to the same shard (whereas in reshard, each file was destined for a new shard). By relying on existing comparison rules of just shards & offsets, a post-scrub file may prematurely compare to a pre-scrub file in the other zone, resulting in a false positive for data corruption.

  • Invariant: start and end Kafka offsets (start & end offsets remain the same), shard
  • Variable: actual content of data (some subset of the data is deleted)
  • Challenge: keeping track of which deletion requests have and have not been processed, given that all files now have the same start & end offsets, but the actual content (and therefore the checksum values) depends on which deletion requests have been processed for that file
  • Solution: store the list of processed deletion requests per file, and files are comparable if and only if they have processed the same set of deletion requests (in addition to the start & end offset match requirements)
  • Challenge: additional guards are needed to ensure the delete operation is not racing against real-time ingestion for the delete-requested data
  • Solution: only data sent in up to the time of deletion request submission is eligible for scrubbing. This is to make sure that the scrub operations are idempotent cross-zone, even when there is an ingestion delay in one or more zones

Multi-compaction

This is another case of operating on and modifying existing data in GCS, but having the opposite invariant and variable from the GDPR case. Multi-compaction is the process of taking multiple files (that may or may not contain data from a single Kafka partition), and compressing them into a single file. This is a continuous process running in the background as additional data is ingested into Mixpanel, to ensure that the data is being stored in the most optimal format possible. The existing heuristic was to pick the most number of files that would fit into a single compaction run, but this strategy had to be modified to account for the ordering of offsets within each partition since a file can contain more than one partition’s data. Files have to be grouped in a manner that is consistent with the guarantee that each file with a given start and end offset pairs contains all relevant data belonging to those offsets and all other offsets in between for any given partition.

These files are in a single shard and can be compacted together. A-partition 1: [1:2], partition 2: [1:2]. B-partition 3: [2:3], partition 4: [2:3]. C-partition 2: [4:6], partition 3: [4:6]. D-partition 1: [4:6], partition 4: [4:6]. E-partition 2: [7:8]

Each rectangle represents a file with start and end offsets, as well as partitions it covers. For example, file E contains data from partition 2 only, with offsets [7:8]. To ensure determinism, Files A, B, and E cannot be compacted together without file C even though files A and B or files B and E do not have any intrinsic hierarchical relationship between each pair due to the lack of shared partitions. If files A, B, and E were to be compacted together without file C, partition 2 would violate the guarantee that all data within the offset pair must be present for each partition. To ensure all partitions are in order and not missing any files in between, a directed acyclic graph (DAG) is used to represent the candidate files for multi-compaction, and topologically sorted to ensure that contiguous files are accounted for. The example above can be translated into the graph below, which is then used to select files for multi-compaction. While the primary criteria are the offset pairs, other properties such as size and even file name are used to make the selection process as easy to reason about and as deterministic as possible.

If files A, B, C, and E were compacted together (without D), the resulting file would have the following offset pairs associated with it :

  • partition 1: [1:2]
  • partition 2: [1:8]
  • partition 3: [1:6]
  • partition 4: [1:3]
  • Invariant: events (content) of the files
  • Variable: start and end offsets of the input vs output file
  • Challenge: selecting files for compaction needs to abide by the rule that all Kafka messages between start and end offsets are included, but not all of these files may be directly comparable to one another for ordering purposes (due to lack of shared partitions)
  • Solution: topological sorting of files, for selecting files to compact together. Edges are connected based on the order of offsets for each partition.

Conclusion

All writing and modifying updates on Mixpanel customers’ event data are now checked for integrity and consistency across zones. Our previous work kicked off the journey by achieving identity for real-time streaming data, and we have now closed the loop by bringing cross-zone data consistency checks to all event data modifications. Data trust and reliability are of utmost importance at Mixpanel, and we will continue to work on improving the system for the data that we were entrusted with.

If working on problems like this interests you, join us!

--

--