The Question
DesignDistributed High-Throughput Message Queue Design
Design a distributed message queue system capable of handling over 1 million writes per second with high durability and partition-level ordering. The system must support multiple independent consumer groups, allow for message retention up to 7 days, and maintain high availability even during broker failures. Discuss how you would optimize for sequential I/O, manage consumer offsets, and handle data replication across a cluster while ensuring minimal latency.
Sequential I/O
Zero-Copy
Raft
gRPC
NVMe
Page Cache
LSM Log
Questions & Insights
Clarifying Questions
Scale and Throughput: What is the target scale in terms of messages per second (Write QPS) and consumption rate (Read QPS)? (Assumption: 1 Million messages/sec write, multiple consumers per message).
Ordering Guarantees: Is global ordering required, or is partition-level ordering sufficient? (Assumption: Partition-level ordering is sufficient for horizontal scalability).
Durability and Persistence: Should messages be persisted to disk, and what is the retention policy? (Assumption: Messages must be persisted for 7 days; durability is critical).
Delivery Semantics: Are we aiming for At-Least-Once, At-Most-Once, or Exactly-Once delivery? (Assumption: At-Least-Once for the MVP).
Consumer Model: Is this a Push-based (server pushes to consumer) or Pull-based (consumer requests from server) model? (Assumption: Pull-based model to allow consumers to manage their own pace/backpressure).
Thinking Process
Sequential Write Strategy: How do we achieve 1M+ QPS? Use append-only logs on disk to leverage sequential I/O, which is significantly faster than random access.
Partitioning for Parallelism: How do we scale beyond a single machine? Divide "Topics" into "Partitions" distributed across a cluster of brokers.
Consumer Management: How do we handle multiple consumers and progress tracking? Use "Consumer Groups" and store offsets (pointers to the last read message).
High Availability: How do we handle broker failure? Implement leader-follower replication for each partition with an In-Sync Replica (ISR) set.
Bonus Points
Zero-Copy Optimization: Utilize
sendfile() system calls to transfer data directly from the OS Page Cache to the Network Socket, bypassing user-space context switches.Tiered Storage: Move older log segments to S3/Object Storage to provide "infinite" retention at lower costs while keeping hot data on local NVMe.
Controller Bottleneck Mitigation: Implement a metadata quorum (e.g., KRaft/Raft) instead of relying on external dependencies like ZooKeeper to reduce architectural complexity and improve failover time.
Batching & Compression: Implement end-to-end batching (Producer -> Broker -> Consumer) and end-to-end compression to maximize bandwidth efficiency.
Design Breakdown
Functional Requirements
Core Use Cases:
Producers can publish messages to specific topics.
Consumers can subscribe to topics and pull messages.
Support for "Consumer Groups" (load balancing messages across group members).
Retention of messages for a configurable period.
Scope Control:
In-Scope: Topic partitioning, message persistence, offset management, and basic replication.
Out-of-Scope: Complex stream processing (Flink-like), dead-letter queue (DLQ) logic, and advanced message filtering.
Non-Functional Requirements
Scale: 1 Million writes/sec; 100 bytes average message size.
Latency: End-to-end latency (Produce to Consume) < 50ms for the 99th percentile.
Availability & Reliability: 99.99% availability; no data loss on single broker failure.
Consistency: High durability (ack from majority replicas) and partition-level strict ordering.
Fault Tolerance: Automatic leader re-election when a broker goes down.
Estimation
Traffic Estimation:
Write QPS: 1,000,000 msg/sec.
Message size: 100 bytes.
Bandwidth In: 1M * 100B = 100 MB/sec (800 Mbps).
Bandwidth Out: 100 MB/sec * 3 (assuming 3 independent consumer groups) = 300 MB/sec.
Storage Estimation:
100 MB/sec * 86,400s (1 day) = ~8.6 TB/day.
7-day retention: ~60 TB.
Replication factor 3: ~180 TB total storage cluster-wide.
Blueprint
Concise Summary: A partitioned, append-only log-based message queue where producers append data to leaders, and consumers pull data using offsets managed in a metadata store.
Major Components:
Producer SDK: Client-side library that handles batching and routing messages to the correct partition leader.
Broker Cluster: Stateless-logic nodes that manage partition logs and handle replication.
Log Storage: Local disk-based append-only files organized by partition for sequential I/O.
Metadata Store: A consistent distributed store (e.g., KRaft/Etcd) to manage cluster state, partition leaders, and consumer offsets.
Simplicity Audit: This design avoids complex push-logic and heavy coordination, focusing on a pull-based log architecture which is the industry standard for high-throughput distributed systems.
Architecture Decision Rationale:
Why this architecture?: The Log-structured approach is the only way to meet the 1M QPS requirement on standard hardware by utilizing sequential disk writes and efficient caching.
Functional Satisfaction: Covers publishing, subscribing, and retention through log segmentation.
Non-functional Satisfaction: Scalability is achieved by adding partitions; Availability is achieved through ISR-based replication.
High Level Architecture
Sub-system Deep Dive
Service
Topology & Scaling:
Brokers are organized in a cluster. Each Broker is a leader for some partitions and a follower for others to balance the load.
Scaling is done by adding more Brokers and re-distributing partitions.
API Schema Design:
Produce(topic, partition, messages[]): gRPC call. Returns offset and timestamp.Fetch(topic, partition, offset, max_bytes): gRPC call. Returns message batch.CommitOffset(group_id, topic, partition, offset): Saves progress.Resilience & Reliability:
Retries: Producers use exponential backoff on "Leader Not Available" errors.
ACKs: Producers can choose
ack=0 (no wait), ack=1 (leader only), or ack=all (full ISR quorum).Observability:
Metrics: Under-replicated partitions, Message In/Out rate, Consumer Lag (critical).
Storage
Access Pattern:
Append-only (Sequential Write).
Offset-based lookup (Sequential Read).
Log Design:
Each Partition is a directory.
Data is split into Segments (e.g., 1GB files) to facilitate easy deletion/retention.
Index files map offsets to physical byte positions in the segment file.
Technical Selection:
Local NVMe/SSD: High throughput sequential writes.
XFS/Ext4: Standard Linux filesystems with good large-file handling.
Distribution Logic:
Partitioning: Hash(Key) % NumPartitions or Round-robin if no key.
Replication: Leader handles all reads/writes. Follower fetches from leader.
Cache
Purpose & Justification: We rely on the OS Page Cache instead of an application-level cache (like Redis).
Strategy: By using the Page Cache, we avoid double-buffering (JVM heap + OS cache). If a consumer is "caught up," the data it needs is likely already in the OS memory from the recent write.
Failure Handling: Clean shutdown flushes cache to disk; hard crashes rely on replication from other nodes.
Messaging
Purpose & Decoupling: This is the messaging layer.
Internal Topic Schema:
__consumer_offsets: Internal topic used to store the last committed offset for every consumer group/partition pair to ensure durability of consumption progress.Throughput & Partitioning:
To handle 1M QPS, we might need 100-200 partitions distributed across 10-20 brokers.
Failure Handling: Dead-letter queues are handled by the Consumer logic (if a message fails processing, the consumer writes it to a "retry-topic").
Infrastructure (Optional)
Distributed Coordination:
Metadata Store: KRaft or Etcd.
Used for: Leader election, Broker membership (heartbeats), and Partition-to-Broker mapping.
Rationale: High consistency (Linearizability) is required for metadata to prevent split-brain scenarios.
Wrap Up
Advanced Topics
Trade-offs (PACELC): In the event of a partition, the system chooses Consistency (C) over Availability (A) for writes (it will stop accepting writes if the ISR quorum isn't met) to prevent data loss.
Reliability: Use of Checksums for every message to detect disk corruption.
Bottleneck Analysis:
Network I/O: Usually the first bottleneck. Solved by batching and compression.
Disk I/O: Solved by sequential access and Page Cache.
Distinguishing Insights:
Pull vs Push: Pull is superior for high scale because it allows consumers to handle their own memory management. In a push model, a slow consumer could cause the broker to buffer massive amounts of data in memory, leading to OOM.
Partition Rebalancing: Avoid frequent rebalancing as it causes high network traffic (shuffling data). Use "Consistent Hashing" for partition assignments or a manual control-plane trigger.