Strategies For Effective Data Compaction

Matthew Hoare
Mixpanel Engineering
7 min readMar 30, 2023

--

As Mixpanel grows alongside its customers, the amount of data we store also increases. One of Mixpanel’s greatest features is its ability to analyze data within seconds of its creation. However, to achieve this, we need to store and modify data in more efficient formats through a process called compaction. At Mixpanel, compaction is defined as any action that rewrites files. We may need to rewrite files for various reasons, but balancing maximum throughput, compliance with laws, and cost management is important. This post discusses how we achieved this balance while simplifying our architecture.

Background

Mixpanel employs a distributed, multi-tenant database called Arb. The Storage Server service is a monolith that manages most storage functions, including compaction. Each storage server is accountable for a specific subset of files and runs various loops over those files to determine the required compactions. When a compaction must be performed, the storage server uses a gRPC call to our compaction service with the necessary arguments, as demonstrated below:

The compaction service commonly combines multiple files into a smaller file, deletes events and people data to comply with General Data Protection Regulation (GDPR) and similar laws, and extracts data for identity management purposes, moving it between locations as necessary.

Problems

Scalability

Currently, there are three bottlenecks to scaling compaction. The first is the speed at which loops can run in the Storage Server to determine what needs compacting. Since the Storage Server is a monolith running on a large machine, scaling up is costly, complicated, and time-consuming.

The second bottleneck is the metric used to scale it. We scale it based on the “resource exhausted” metric from gRPC calls. However, this metric is inefficient for scaling, as explained in the power-of-2-choices blog. We resolved our initial issue with scaling by implementing the power-of-2 for gRPC connections, but we soon encountered other problems.

The third bottleneck is the maximum number of outstanding connections a storage server can handle. This limitation has limited the throughput at which we can compact files, and we were quickly approaching that limit.

Visibility

While we could see the number of outstanding gRPC calls, we lacked a true backlog metric for these loops. This made it difficult to determine how far behind we were. The definition of “behind” depends on the compaction operation. For example, for GDPR deletions, “behind” means how close we are to our 30-day service level agreement (SLA). In contrast, for combining files, it means how long ago that operation should have taken place.

Utilization

The compaction loops that run in the storage server, combined with a lack of knowledge regarding the actual backlog size, result in the wastage of many resources.

First, the compaction loops in the storage server can use up to 40GB of memory, which is over 30% of the server’s available memory. Although this usage is not one of the server’s most critical functions, it causes a lot of garbage collection overhead, resulting in high CPU cycle consumption.

Second, we could not optimize our publishing rate to reduce costs because we did not know the actual size of the backlog for a compaction type. However, since we prioritize customer experience, we always over-provisioned, which drove up costs.

Possible Solutions

Pulling out loops into their own service

One simple solution would be to move the compaction loops from the storage server to their own service. This would free up memory and CPU cycles for the storage server’s critical functions. However, this alone would not resolve the visibility issues or scale for publishing. We were prepared to address all underlying problems to tackle these challenges.

Storage Server + Queue

We did consider adding a queue between the storage server and our compaction service pods. This would reduce the number of outstanding gRPC connections, ensuring that compaction instances only accept work when ready. It would also greatly increase visibility by allowing work to be published into the queue and using queue metrics to estimate the amount of work. This approach would also free us from the limitations of gRPC connections. Unfortunately, it would not free up storage server memory and CPU.

Task Workflow System + Queue

Internally, we have a distributed task job system similar to Apache Airflow. While we considered using this system, it currently has limitations regarding the granularity of the unit of work. This would not have been suitable for some compaction loops. Ultimately, this tool was not the right fit for the job.

Splitting out each loop into its own service + queue

We have decided to split the compaction loops into their own service and introduce a queue between the compaction service and the loops. This change will improve the performance of the storage server by eliminating the need to perform the compaction loop work and keep open gRPC connections. Introducing a queue will also provide visibility into the work. This concept is called pull-based compaction.

Pull-Based Compactions

After considering all possible solutions, we determined that the best option for the future of our architecture is to switch to a pull-based compaction system, with each loop having its own service. Although this change requires a significant architectural shift, we believe it is necessary to implement now.

How it works

Below is a high-level overview of what a compaction loop service looks like

Pull-based compactions have four major components, each with its own challenges —

Scheduler

All metadata related to Mixpanel files is stored in Google Cloud Spanner in what we call the file manifest. We scan Spanner and examine metadata such as size, age, and other relevant factors to determine if files need compacting. However, scanning this database in sequence would take an eternity, as it is over 7TB in size and has over 17 billion rows.

Fortunately, Spanner provides a useful type of query called a partition query. This query returns multiple subqueries that can be distributed to different nodes. We can specify the number of subqueries to return using Spanner’s options. We can then distribute these subqueries to publishers to perform the necessary fan-out.

Publisher

The scheduler provides each publisher with a set of subqueries. Publishers execute these subqueries on Spanner and process the resulting data. If files require compaction, the corresponding metadata is published to the queue for the compaction consumers to handle. We may scale up the number of publishers in the future to handle more data as our company grows.

Queue

We use PubSub for all our compaction services because of its desirable properties. It can scale up to as many consumers as necessary, avoid head-of-line blocking, and typically deliver the oldest messages first.

Compaction Consumers

Compaction Consumers will retrieve the file metadata from the queue and perform the compaction operation on those files. A useful feature is that since these pods execute the same compaction operation, we can optimize the machine type to improve utilization and reduce costs.

In a perfect world, this project would be considered complete. Compaction consumers would only take on the work they can handle and avoid getting into an unrecoverable state. Although all compactions are the same type, they differ in their resource requirements. Some messages may require two to three times more work than the average message in the queue. The two resources that primarily constrain our processing are memory and disk I/O. If either becomes bottlenecked, then virtually no work gets done. Memory is important to prevent us from thrashing the page cache as we hold all the files we compact in memory, and disk I/O is important for downloading the files from Google Cloud Storage (GCS) and writing the result locally.

To address this issue, we implemented an in-memory local fair queue that efficiently bin packs the compaction consumer without overloading it. When retrieving messages from Pubsub, we accurately estimate the required resources. The local fair queue ensures that we do not starve bigger, more resource-heavy messages while processing as many smaller, less resource-heavy messages as possible.

One might ask, “Why don’t we have two or more queues based on message size?” While this approach could work, it would not fully solve the underlying problem of messages having varying sizes. Instead, we can make more meaningful changes in upstream services. That said, the in-memory fair queue was the simplest solution that provided most of what we wanted to achieve.

Below is a visual example —

Results

We are pleased with the results. The following example shows that removing the GDPR loop from the storage server frees up 40GB of memory. It should be noted that an old version was deployed on July 26–27 to address an incident, and memory spiked back up temporarily.

Furthermore, the loop that merges several files into one has given us an updated graph of the number of pending operations. This allows us to optimize our compaction services to tackle the backlog effectively.

We are excited about the new possibilities that this can bring us. If working on problems like these interests you, come join us!

--

--