The Question
Design

Distributed Persistent Message Log System

Design a high-throughput, distributed, and fault-tolerant message queue system capable of handling millions of events per second. The system must provide strict ordering within a partition, support long-term data retention, and ensure zero data loss during broker failures. Detail the storage engine, replication mechanism, and how you would optimize for maximum disk and network I/O performance.
Raft
ISR
Zero-Copy
Page Cache
LSM-Tree
TCP
S3
NVMe
Questions & Insights

Clarifying Questions

What is the target scale for throughput and storage?
Assumption: We need to support 10 million messages per second (write) and 50 million messages per second (read) with 1KB average message size. Data should be retained for 7 days.
What are the durability and consistency requirements?
Assumption: High durability is required (no data loss). We will implement an In-Sync Replica (ISR) model to balance consistency and availability.
How is metadata managed?
Assumption: To follow modern standards and reduce architectural complexity, we will use a self-managed metadata quorum (similar to KRaft/Raft) instead of relying on external dependencies like ZooKeeper.
What are the ordering guarantees?
Assumption: Strict ordering is required per-partition, but not across the entire topic.

Thinking Process

Core Bottleneck: Disk I/O and Network saturation are the primary constraints when handling millions of messages.
Strategy:
Log-Structured Storage: Use append-only files to leverage sequential disk I/O, which is significantly faster than random access.
Zero-Copy Optimization: Utilize sendfile to transfer data from the OS Page Cache directly to the NIC buffer, bypassing user-space copying.
Partitioning & Replication: Horizontally scale via partitions and ensure high availability through leader-follower replication.
Pull-based Consumption: Offload flow control to consumers, allowing them to process data at their own pace.

Bonus Points

Log Compaction: Support state-sharing use cases where only the latest value for a specific key is preserved, optimizing storage for "current state" topics.
Tiered Storage: Move older log segments from expensive local NVMe/SSD to cheaper Object Storage (S3/GCS) to allow for "infinite" retention without scaling the broker cluster linearly.
Batching & Compression: Implement end-to-end batching from Producer to Broker to Consumer to maximize throughput and reduce per-message overhead.
Quorum-based Metadata: Using a Raft-based controller to eliminate the "split-brain" issues and "herd effect" common in legacy ZooKeeper implementations.
Design Breakdown

Functional Requirements

Produce: Clients can publish messages to specific topics.
Consume: Clients can subscribe to topics and read messages.
Persistence: Messages are stored on disk for a configurable retention period.
Ordering: Messages within a partition must be delivered in the order they were written.
Replayability: Consumers can reset their offsets to re-read historical data.

Non-Functional Requirements

High Throughput: Support GBs/sec of aggregate data flow.
Low Latency: End-to-end latency (produce to consume) under 50ms at the 99th percentile.
Scalability: Cluster should scale linearly by adding more brokers.
Fault Tolerance: No data loss on single node failure; high availability for reads/writes.

Estimation

Write Throughput: 10M msgs/sec * 1KB = 10 GB/sec.
Read Throughput: 50 GB/sec (assuming 5 consumers per message).
Storage (7 days): 10 GB/sec 86,400s 7 days * 3 (replication) ≈ 18 PB.
Network: 10 GB/sec (in) + 30 GB/sec (replication traffic) + 50 GB/sec (out) = 90 GB/sec. This requires a large fleet of brokers with 100Gbps NICs.

Blueprint

Concise Summary: A distributed, partitioned, replicated commit log service where producers append data to the tail and consumers poll data using offsets.
Major Components:
Producer SDK: Handles batching, partitioning logic, and retries.
Broker Node: Manages log storage, partition leaders, and follower replication.
Controller Quorum: Manages cluster metadata, partition leadership, and broker health using Raft.
Storage Engine: Handles append-only log segments and sparse indexes.
Simplicity Audit: This design avoids external dependencies (ZooKeeper) and focuses on the core primitive of the "distributed log." It skips complex features like Stream Processing (Kafka Streams) or Connectors for the MVP.
Architecture Decision Rationale:
Why this architecture?: The append-only log is the most efficient way to achieve high-throughput persistence. Decoupling producers and consumers via a pull-model ensures the system is resilient to slow consumers.
Functional Satisfaction: Partitioning satisfies scale; ISR replication satisfies durability.
Non-functional Satisfaction: Zero-copy and sequential I/O satisfy performance; Raft-based metadata satisfies reliability.

High Level Architecture

Sub-system Deep Dive

Service

Topology & Scaling
Stateless/Stateful: Brokers are stateful as they hold partition data.
Scaling: Horizontal scaling by adding brokers and reassigning partitions.
Isolation: Topics are the logical boundary; partitions are the physical scaling unit.
API Schema Design
ProduceRequest: Topic, Partition, Batch(Messages), RequiredAcks. (TCP/Binary Protocol).
FetchRequest: Topic, Partition, Offset, MaxBytes.
MetadataRequest: TopicList. (Returns leader addresses).
OffsetCommit: ConsumerGroup, Partition, Offset.
Resilience & Reliability
ISR (In-Sync Replicas): Only replicas that are caught up with the leader can be elected as new leaders.
Acks: acks=all ensures data is written to all ISRs before acknowledging.
Observability
Metrics: Log Flush Latency, Partition Count, Under-replicated Partitions (URP), Byte Rate In/Out.

Storage

Access Pattern
100% Sequential Writes (append to end of file).
Mostly Sequential Reads (consumers reading from specific offsets).
Database Table Design (Log Structure)
Log Segment: A file containing raw message bytes.
Index File: A sparse mapping of Offset -> Physical File Position.
Time Index: Mapping of Timestamp -> Offset.
Technical Selection
Custom Log Engine: Use standard filesystem (XFS/ext4) but manage file handles and memory mapping (mmap) manually for performance.
Distribution Logic
Sharding: Topic-Partition based. Partition determined by hash(key) % num_partitions.
Replication: Leader-Follower. Follower pulls from Leader.

Cache

Purpose & Justification: Kafka relies heavily on the OS Page Cache. By not implementing a custom application-level cache, we avoid double-buffering and allow the OS to use all free RAM for caching log segments.
Key-Value Schema: Not applicable.
Failure Handling: Clean shutdown flushes Page Cache to disk. Unclean shutdown triggers log recovery/checksum validation.

Messaging

Purpose & Decoupling: This system is the messaging layer. Internal replication acts as a "messaging" function between brokers.
Delivery Semantics: At-least-once (default), Exactly-once (via transaction coordinator and idempotent producers).
Wrap Up

Advanced Topics

Trade-offs (PACELC): Kafka chooses Availability and Partition Tolerance (AP) during normal operations but can be configured for Consistency (CP) using min.insync.replicas and acks=all.
Reliability: If the leader fails, the Controller elects a new leader from the ISR. If no ISR is available, the system can either stay down (Consistency) or allow an "unclean" election (Availability).
Bottleneck Analysis:
Network: The "3x replication" means every byte in results in 3 bytes of internal traffic + N bytes out to consumers. 100Gbps networking is a prerequisite.
Disk: IOPS are less important than sequential throughput. High-density HDDs are often preferred for cost, while NVMe is used for low-latency writes.
Security: TLS for encryption in transit. SASL/SCRAM for client authentication. ACLs for topic-level authorization.
Distinguishing Insights:
Log Compaction: Essential for "System of Record" use cases.
Controller Bottleneck: Moving from ZooKeeper to KRaft (Raft-based) allows the cluster to scale to millions of partitions, as the metadata is handled natively within the broker process.