Sleeping at Scale - Delivering 10k Timers per Second per Node with Rust, Tokio, Kafka, and Scylla
22 Jul 2024 (6 months ago)
Journey Builders and the Need for a Timer System
- Lily Mara, an engineering manager at OneSignal, a customer messaging company, discusses the development of a scalable, high-throughput timer system.
- The system was built to enable the creation of Journey Builders, a no-code system that allows marketers to build custom messaging flows for users.
Journey Builders Functionality
- Journey Builders involve decision nodes based on user data and scheduled actions, such as sending push notifications or SMS messages.
- The timer system was necessary to implement the scheduling aspect of Journey Builders, allowing for delays and waiting periods within the messaging flows.
Requirements for the Timer System
- The team at OneSignal needed a high-performance timer system to handle 10,000 timers per second per node.
- They considered using existing open-source queuing systems like Sidekick and RabbitMQ, but these systems lacked the performance needed and had features they didn't require.
- They decided to build their own timer system, prioritizing performance, data loss minimization, and interoperability with their existing systems.
OneSignal's Existing Systems
- OneSignal's existing systems were primarily written in Rust, used Apache Kafka for asynchronous messaging, gRPC for internal synchronous RPCs, and Scylla for data storage.
Design Principles for the Timer System
- The timer system needed to be generic, allowing for various actions beyond just sending notifications.
- They wanted the system to be simple, avoiding complex templating or scripting.
- They decided against using HTTP requests or gRPC requests for actions, as these are synchronous systems.
- They prioritized asynchronous outputs for the timer system to avoid blocking and ensure resilience in case of system failures.
- The system aims to handle timer requests asynchronously to avoid overloading the system with requests from failing systems.
Timer System Architecture
- Apache Kafka is used as a queuing system to isolate the timer system from the performance of the end action system.
- Timers are written to Kafka synchronously via a gRPC call, allowing for controlled latency.
- The timer system receives a gRPC request with an expiry time, a Kafka topic, partition, and data to be written to the Kafka stream.
- When a timer expires, the system writes the corresponding Kafka message to the specified location.
- The timer system is isolated from the consumer that picks up the message and acts on it, ensuring that performance issues with the consumer do not impact the timer system.
Timer Scheduling and Expiration
- The system uses a scheduling service written in Rust to retrieve timers from the gRPC service every minute.
- The scheduling service retrieves timers that expire before a specified time, stores them in memory, and expires them to Kafka as they expire.
- The system uses an arena in memory to store pending timers and an infinite loop with a 1-second period to check for expired timers.
- When a timer expires, an asynchronous task is spawned using Tokio to produce the Kafka event, delete the timer from the gRPC service, and perform additional synchronization.
- The implementation of the timer system was surprisingly simple, with the most complex part being the communication back to the main task to remove timer IDs from the hash set.
- The system was able to spawn a million timers in 350 milliseconds, consuming 600 MB of memory, which was deemed acceptable performance.
- Two key performance metrics were identified: the number of pending timers in the hash set and the timestamp of the last timer expired to Kafka.
- The number of pending timers was used to prevent out-of-memory issues by limiting the number of timers loaded into memory.
- The timestamp of the last expired timer was used to monitor the drift between the timer system and reality, ensuring timely message delivery.
Data Storage with Scylla
- The storage layer needed to support high write throughput (10,000 writes per second) and be easily scalable and maintainable.
- PostgreSQL was considered but deemed too complex for the simple queries required.
- Scylla was chosen as the storage layer due to its scalability, simplicity, and existing use within the Journeys project.
Data Modeling for Scylla
- Data modeling for Scylla required careful consideration due to its different approach compared to relational databases like PostgreSQL.
- The text discusses the importance of data modeling for efficient querying in a NoSQL database like Scylla, which uses SSTables and doesn't support joins or complex indexing.
- The author emphasizes the need to consider query requirements during data storage, particularly for retrieving timers about to expire.
- The text explains that Scylla uses a distributed architecture with nodes, and queries should ideally target a single node for optimal performance.
- Scylla utilizes primary keys with two components: a partitioning key that determines the node and partition, and a clustering key that defines the row's position within the partition.
Optimizing Queries for Expiring Timers
- To efficiently query timers about to expire, the author proposes bucketing timers into 5-minute intervals based on their expiry time.
- A unique ID field (UUID) is introduced as the clustering key to ensure uniqueness within each bucket, allowing multiple timers to exist within the same 5-minute interval.
- The final table design includes the original four fields (expiry timestamp, binary data blob, CCO topic, and partition) along with the new UUID and bucket fields.
- The primary key consists of the bucket field (partitioning key) and the UUID field (clustering key).
- Queries for timers about to expire will target specific buckets based on the rounded-down expiry time, ensuring efficient retrieval of relevant timers.
- The system stores timers in 5-minute buckets within a Scylla database.
- To handle a 10-minute look-ahead interval, the system needs to query multiple buckets.
- A metadata table is introduced to track all existing buckets, allowing efficient querying of relevant buckets within the look-ahead window.
- Every timer insertion triggers an upsert operation in both the timer table and the bucket table.
Timer Retrieval and Processing
- The system retrieves timers from the database, filters them based on the look-ahead window, and sends them to the scheduler.
- The scheduler stores the timers in memory and triggers actions when they expire.
- Expired timers are written to an Apache Kafka topic for further processing.
- The system has been running smoothly for a year and a half, handling billions of concurrent timers.
- However, scaling limitations were identified as the system was designed to support more use cases and users.
Scaling the Timer Service
- Hunter Lane, an engineer on Lily's team, will discuss the scaling efforts undertaken in Q1 of the current year.
- The team wanted to scale their timer service to handle a larger number of timers, especially for integrating with their delivery systems.
- The original timer service was designed for marketing systems, which used a smaller scale of timers.
- Scaling the service vertically (adding resources) was easy, but scaling horizontally (adding more instances) was challenging for the scheduler.
- The scheduler needed to be able to distribute timers among multiple instances to avoid duplication and ensure efficient load sharing.
Horizontal Scaling with Sharding
- To enable horizontal scaling, the team redesigned the data storage to group timers by both time and a new "Shard" identifier.
- They adjusted the Scylla table schemas to include the Shard information when creating new timers.
- The team also reduced the bucket interval from 5 minutes to 1 minute to keep Scylla partitions smaller and improve efficiency.
- Each scheduler instance is responsible for a specific set of Shards, ensuring that each instance retrieves a unique subset of timers.
- The team adjusted the "get timers" requests to include the Shard information, allowing for targeted retrieval of timers.
- The team needed to ensure that each scheduler instance maintains its state to consistently request the same subset of timers.
Stateful Scheduler Deployment
- The video discusses the evolution of a timer service, initially designed to handle a large number of timers, and how it was scaled to handle 10,000 timers per second per node.
- The initial design had a single scheduler, which led to issues with redundancy and potential data loss if the scheduler went down.
- To address these issues, the scheduler was deployed as a stateful set in Kubernetes, providing a stable unique name for each instance and allowing for shard-based timer retrieval.
- This ensured that each timer was only retrieved by one scheduler instance, eliminating redundancy and improving reliability.
Scaled Timer Service Architecture
- The timer service now has multiple instances of both the gRPC service and the scheduler, making it more resilient to outages.
- Each scheduler node can handle 10,000 timers per second, and each gRPC instance can handle 177,000 requests per second.
Timer Service Guarantees and Limitations
- The timer service has an "at least once" guarantee, meaning that timers may be fired multiple times if there are errors during deletion.
- Timers are fired close to, but not exactly at, the scheduled time due to the periodic retrieval process.
- Currently, there is no way to cancel a timer once it has been retrieved by the scheduler.
Future Plans and Open-Sourcing
- The timer service has proven to be valuable within the organization and there are plans to open-source it in the future.
- The team intends to integrate the timer service more broadly across their infrastructure and add features like the ability to cancel timers at any point in their lifecycle.
Scaling Challenges and Solutions
- The team was able to scale their timer system both vertically and horizontally to accommodate future use.
- When adding new nodes to the scheduler, the team would take all nodes down for a restart, resulting in a potential 30-second latency.
- The team initially explored more complex data structures for the scheduler but ultimately found that the simplest approach, using a future and waiting on a task, was sufficient.
System Details and Design Choices
- All timers in the system are in UTC and are scheduled by other teams at OneSignal.
- The system uses a full table scan to retrieve timers, but the number of entries in the table is constrained, limiting the potential for performance issues.
- The team chose Scylla over Cassandra due to their CTO's aversion to Java and has been happy with Scylla since adopting it.
- The team has a single data center for timers with six nodes, but the replication factor is unknown.
- The team does not have any Java code in production.
Potential Issues and Mitigation
- The GRPC service returns a timer multiple times until it expires or the delete timer method is called, leading to data retransmission and potential inefficiency.
- This approach was chosen for its simplicity but could be optimized in the future.
- The speaker acknowledges the possibility of a crashing bug in a particular timer, potentially causing a node to crash.
- However, the attack surface for timers is small due to limited data and Kafka settings, making such issues unlikely.
- There is no automatic recovery mechanism in place for a crashing node, requiring manual intervention by an engineer.