The Question
Design

Design a Distributed High-Throughput Message Queue

Design a distributed message queue system capable of handling 1 million messages per second. The system should support multiple producers and consumers, provide partition-level ordering, and guarantee at-least-once delivery. Discuss how you would handle persistence, horizontal scaling, and broker failures while maintaining low latency for real-time applications.
Etcd
Zero-copy
TCP
gRPC
Append-only Log
Raft
ISR
Page Cache
Questions & Insights

Clarifying Questions

What are the primary performance targets? (e.g., Throughput in messages per second, latency for end-to-end delivery).
What are the durability and delivery guarantees? (e.g., At-least-once, Exactly-once, or Best-effort; must messages survive a total broker crash?).
What is the consumption model? (e.g., Pull-based like Kafka or Push-based like RabbitMQ?).
Is strict ordering required? (e.g., Global ordering vs. Partition-level ordering).
What is the average message size and retention policy? (e.g., 1KB messages, 7-day retention).
Assumptions for MVP:
Scale: 1 Million messages per second (Write-heavy).
Persistence: Messages must be persisted to disk (Durability).
Ordering: Partition-level strict ordering.
Delivery: At-least-once delivery guarantee.
Model: Pull-based model for consumer flexibility.

Thinking Process

Core Bottleneck: Disk I/O and network bandwidth are the primary constraints when handling millions of messages.
Key Strategy: Use Append-only Logs to turn random writes into sequential writes and Zero-copy (sendfile) to minimize CPU context switching during reads.
Progressive Logic:
How do we store messages efficiently? (Sequential Disk I/O).
How do we scale horizontally? (Partitioning/Sharding).
How do we ensure high availability? (Leader-Follower Replication).
How do we manage metadata? (Centralized Coordination service).

Bonus Points

Zero-Copy Optimization: Utilizing the Linux sendfile system call to transfer data directly from the OS Page Cache to the Network Socket, bypassing user-space buffers.
Batching & Compression: Aggregating messages on the producer side and compressing them (e.g., Zstd, Snappy) to reduce both IOPS and network bandwidth.
Log Compaction: Implementing a mechanism to retain only the last known value for a specific key, optimizing storage for state-heavy workloads (like K-V stores built on logs).
ISR (In-Sync Replicas) Model: Balancing between strong consistency and availability by dynamically managing the set of replicas that are caught up with the leader.
Design Breakdown

Functional Requirements

Core Use Cases:
Producers: Can send messages to specific "Topics".
Consumers: Can subscribe to topics and pull messages starting from a specific offset.
Persistence: Messages are stored for a configurable period (TTL) or size limit.
Scope Control:
In-scope: Distributed storage, partitioning, replication, and basic metadata management.
Out-of-scope: Message transformation (ETL), complex routing headers (like RabbitMQ Exchanges), and Dead Letter Queue (DLQ) management for the MVP.

Non-Functional Requirements

Scale: Support 1M+ msg/s ingress and 3M+ msg/s egress (assuming multiple consumers per message).
Latency: Sub-10ms for producer acknowledgement; sub-50ms for end-to-end delivery (p99).
Availability & Reliability: 99.99% availability; no data loss upon single node failure (N+1 redundancy).
Consistency: Partition-level ordering; read-your-writes consistency for the leader.
Fault Tolerance: Automatic leader election if a broker fails.

Estimation

Traffic Estimation:
Write QPS: 1,000,000 msgs/s.
Read QPS: 3,000,000 msgs/s (3 consumer groups).
Storage Estimation:
1KB per message * 1M msgs/s = 1GB/s.
Daily storage: 1GB/s * 86,400s ≈ 86TB.
With 3x replication: ~258TB per day.
Bandwidth Estimation:
Inbound: 1GB/s.
Outbound: 3GB/s.

Blueprint

Concise Summary: A distributed, partitioned, append-only commit log system where producers append data to the end of logs and consumers pull data using offsets.
Major Components:
Producer SDK: Client-side library that handles batching, partitioning logic, and retries.
Broker Cluster: The core storage nodes that manage partitions, handle writes to append-only logs, and serve consumer fetch requests.
Coordination Service (Etcd): Manages cluster metadata, including broker registration, topic/partition locations, and leader election.
Consumer SDK: Client library that manages offsets and pull-based fetching logic.
Simplicity Audit: This design avoids complex "Push" state management on the broker by shifting offset tracking to the consumer or a simple metadata store, adhering to the YAGNI principle for scaling.
Architecture Decision Rationale:
Append-only Log: Best for high-throughput writes on physical disks.
Partitioning: Essential for horizontal scaling; one topic can span many brokers.
Pull-model: Prevents overwhelming slow consumers and allows batching of reads.

High Level Architecture

Sub-system Deep Dive

Service

Topology & Scaling:
Stateless frontend logic but stateful storage.
Scaled by adding more Broker nodes and rebalancing partitions.
API Schema Design:
Produce(topic, partition, message_batch) -> REST/gRPC. Returns base_offset.
Fetch(topic, partition, offset, max_bytes) -> gRPC. Returns message_list + next_offset.
GetMetadata(topic) -> Returns broker addresses for partitions.
Resilience & Reliability:
Leader-Follower: Each partition has 1 Leader and N Followers. All writes go to the Leader.
Ack Levels: ack=0 (fire/forget), ack=1 (leader only), ack=all (full ISR quorum).
Observability:
Key Metric: Consumer Lag (Current Log End Offset - Consumer Offset).
Broker metrics: Disk I/O utilization, Network throughput, Request latency.

Storage

Access Pattern: 100% sequential writes (append); 90% sequential reads (tailing the log).
Database Table Design (File System Structure):
Segment Files: 000000001.log, 000000500.log. (Actual raw bytes).
Index Files: 000000001.index. Maps offset -> physical_position_in_file.
Time Index: Maps timestamp -> offset.
Technical Selection: Local Disk (XFS or EXT4) with a focus on Sequential I/O. RAID 10 for local redundancy and performance.
Distribution Logic: Partitioning based on hash(key) % num_partitions.

Cache

Purpose & Justification: We rely on the OS Page Cache (Kernel-level caching) instead of an application-level cache (like Redis).
Strategy: When a producer writes, it goes to the Page Cache and is periodically flushed to disk (fsync). When a consumer reads the "tail" of the log, it is served directly from the Page Cache, resulting in near-memory speeds.
Failure Handling: If the broker crashes, un-fsynced data in Page Cache is lost (mitigated by replication to other nodes' Page Caches).

Data Processing

Processing Model: Uses Etcd or ZooKeeper for distributed coordination.
Function:
Broker Registry: Heartbeats to detect node failures.
Leader Election: Uses Raft/Paxos via the Coordination Service to elect a leader for each partition.
Topic Configuration: Stores partition counts and replication factors.
Technical Selection: Etcd. Rationale: Strong consistency (CP), high reliability, and excellent support for "watch" semantics to notify brokers of changes.
Wrap Up

Advanced Topics

Trade-offs (PACELC): In the event of a network partition (P), we choose Consistency (C) over Availability (A) for specific partitions to ensure no data divergence (using the ISR model).
Reliability: Use a Write-Ahead Log (WAL) pattern. Every message is appended before the producer is acknowledged.
Bottleneck Analysis:
Hot Partitions: If a specific key is too frequent, one partition becomes a bottleneck. Mitigation: Use a random partitioner if ordering isn't required for that key.
Disk Space: Mitigation: Retention policy deletes old segments based on time or size.
Security: TLS for data in transit. SASL/Plain or OAuth2 for producer/consumer authentication. ACLs at the topic level.
Distinguishing Insights:
Batching: Small messages kill performance. The Producer SDK must buffer messages and send them in chunks (e.g., 64KB or 10ms window).
Segment-based Storage: Don't use one giant file. Use segments (e.g., 1GB each) to make deletions easy (just delete the file, no expensive random-access I/O).