The Question
DesignDistributed Metrics Monitoring and Aggregation System
Design a high-scale, distributed system capable of ingesting, storing, and querying 10 million metrics per second from a fleet of global microservices. The system must support real-time alerting with sub-10 second latency and provide high-performance analytical queries for long-term trend visualization (up to 1 year of data). Address challenges such as high-cardinality tags, data compression, and the trade-offs between write throughput and query latency in a cloud-native environment.
Kafka
ClickHouse
Flink
Redis
gRPC
Protobuf
Prometheus
S3
ZooKeeper
Questions & Insights
Clarifying Questions
Scale and Throughput: What is the expected volume of data points per second (DPS) and the number of unique time series (cardinality)?
Latency Requirements: What is the acceptable end-to-end latency from the time a metric is emitted to when it is queryable or triggers an alert?
Data Retention and Resolution: How long should we keep raw data versus aggregated (downsampled) data? (e.g., 10s resolution for 7 days, 1h resolution for 1 year).
Metric Types: Are we supporting Gauges, Counters, Histograms, and Summaries? Do we need to support high-cardinality tags (e.g.,
user_id)?Push vs. Pull: Should the system pull metrics from targets (like Prometheus) or should clients push metrics to the system?
Assumptions for MVP:
Scale: 10 million data points per second (DPS).
Cardinality: Up to 1 million active time series.
Latency: Under 5 seconds for ingestion-to-query; under 10 seconds for alerting.
Retention: 30 days for raw data; 1 year for downsampled data.
Model: Push-based ingestion via SDKs/Sidecars for cloud-native flexibility.
Thinking Process
Core Bottleneck: How do we handle massive write-heavy traffic without losing data or impacting the monitored application's performance?
Storage Choice: How do we store time-series data to allow for efficient range scans and aggregations across many dimensions?
Query Performance: How do we ensure fast dashboard loads when querying months of data?
Reliability: How do we prevent the "Monitoring the Monitor" failure where the metrics system crashes under the same load it's trying to report?
Bonus Points
M-Block Compression: Discussing Gorilla-style (Facebook) floating-point compression (XOR-based) to reduce storage footprint by 10x.
Handling Cardinality Explosion: Implementing a "protector" service that detects and drops metrics with runaway tags (e.g.,
uuid as a tag) before they hit the storage layer.Zero-Drop Ingestion: Using a local agent (sidecar) with disk-backed buffering to survive network partitions or downstream outages.
Downsampling as a Service: Implementing background jobs that pre-calculate 1-hour/1-day aggregates to optimize long-term trend queries.
Design Breakdown
Functional Requirements
Core Use Cases:
Ingest metrics (timestamp, metric name, tags, value) from distributed services.
Query metrics via a query language (e.g., SQL-like or PromQL).
Real-time alerting based on threshold violations.
Dashboard visualization support.
Scope Control:
In-scope: Data ingestion, stream processing for alerts, time-series storage, and query API.
Out-of-scope: Log tailing/searching (ELK/Splunk territory), Tracing (Jaeger territory), and Visualization UI (use Grafana integration).
Non-Functional Requirements
Scale: Support 10M DPS and horizontal scaling of all layers.
Latency: P99 Ingestion < 200ms; P99 Query < 1s for short ranges.
Availability & Reliability: 99.99% availability; data loss is highly undesirable but partial loss during catastrophic failure is preferred over system-wide blocking.
Consistency: Eventual consistency is acceptable for queries; alerting requires near-real-time processing.
Fault Tolerance: No single point of failure; must handle regional/AZ outages.
Estimation
Traffic Estimation:
10M DPS * 100 bytes per point (serialized) = 1 GB/s Ingress.
Peak traffic (3x) = 3 GB/s.
Storage Estimation:
Raw storage (no compression): 1 GB/s 86,400s 30 days = 2.5 PB.
With TSDB compression (e.g., 2 bytes/sample): 10M 2 86,400 * 30 = ~52 TB (much more manageable).
Bandwidth:
Incoming: 1 GB/s.
Outgoing (Queries/Alerts): Estimated 100-200 MB/s.
Blueprint
Concise Summary: A push-based ingestion pipeline using Kafka for load leveling, Flink for real-time windowed aggregations/alerting, and ClickHouse as a high-performance columnar time-series store.
Major Components:
Ingestion Gateway: L7 Load balancer and stateless services to validate and normalize incoming gRPC/HTTP metric streams.
Kafka: Acts as a high-throughput buffer to decouple ingestion from storage and processing.
Flink Aggregator: Performs real-time windowing to generate alerts and pre-aggregate high-frequency metrics.
ClickHouse: Provides a distributed, columnar database optimized for time-series range queries and massive write throughput.
Query Service: A stateless API layer that translates PromQL/SQL to ClickHouse queries.
Simplicity Audit: This design uses ClickHouse's built-in capability to handle massive insertions and aggregations, removing the need for a separate cache layer or complex multi-tiered storage for the MVP.
Architecture Decision Rationale:
Why this?: Columnar stores like ClickHouse outperform traditional NoSQL (Cassandra) for analytical queries (SUM, AVG) over large datasets. Kafka ensures that spikes in metric volume don't crash the database.
Functional Satisfaction: Covers end-to-end flow from metric emission to query.
Non-functional Satisfaction: Horizontally scalable at every tier.
High Level Architecture
Sub-system Deep Dive
Edge (Optional)
Content Delivery & Traffic Routing: Metrics are dynamic; no CDN. Use Geo-DNS to route traffic to the nearest regional Ingestion Gateway.
Security & Perimeter:
API Gateway: Performs MTLS or API Key validation to ensure metrics are from authorized clients.
Rate Limiting: Enforced per-tenant (service ID) to prevent a single "noisy neighbor" service from overwhelming the ingestion pipeline.
Service
Topology & Scaling: Stateless Ingestion and Query services deployed in Kubernetes. Scaled based on CPU and Network I/O.
API Schema Design:
Endpoint:
POST /v1/metricsProtocol: gRPC (Protobuf) for efficiency and low overhead.
Payload:
message Metric { string name; map<string, string> tags; int64 timestamp; double value; }SLA: 99.9% ingestion success.
Resilience:
Backpressure: If Kafka is full, the Ingestion Gateway returns HTTP 429/503, allowing clients to buffer locally.
Circuit Breaker: Used between the Query Service and ClickHouse to prevent slow queries from cascading.
Storage
Access Pattern: Write-heavy (10M DPS). Read-heavy for dashboards (range scans on
metric_name and time).Database Table Design (ClickHouse):
Table:
metrics_rawFields:
metric_name (String), tags (Map String, String), timestamp (DateTime64), value (Float64), date - for partitioning).Primary Key:
(metric_name, tags, timestamp). Partitioning: Partition by
date to allow easy deletion of old data.Technical Selection: ClickHouse.
Rationale: Superior compression and specialized
SummingMergeTree engines for pre-aggregation.Distribution Logic: Sharding by
metric_name to ensure all data for a specific metric resides on the same shard for faster local aggregations.Cache
Purpose: Reducing ClickHouse load for frequent metadata lookups (e.g., "List all tag names for metric X").
Key-Value Schema:
Key:
metadata:{metric_name}Value: Set of tag keys/values.
TTL: 1 hour.
Technical Selection: Redis. Used for service discovery of metrics and caching frequently accessed dashboard results.
Messaging
Purpose: Load leveling and decoupling.
Event Schema: Protobuf serialized metric points.
Throughput & Partitioning: Kafka topic
metric-ingestion partitioned by metric_name to maintain ordering per time series and enable parallel processing by Flink.Failure Handling: Dead-letter queue (DLQ) for malformed metrics (e.g., invalid timestamps).
Data Processing
Processing Model: Flink Streaming.
Processing DAG:
Source: Kafka.
Window: 10-second tumbling windows.
Aggregations: Calculate Min, Max, Avg.
Sink: Push alerts to Alert Manager if thresholds are exceeded; write aggregates back to a "downsampled" Kafka topic for ClickHouse.
Correctness: At-least-once processing is sufficient for metrics (idempotent writes in ClickHouse).
Infrastructure (Optional)
Observability: The system monitors itself using a secondary, isolated instance of the same architecture.
Distributed Coordination: ClickHouse uses Keeper (or ZooKeeper) for cluster metadata and replication coordination.
Wrap Up
Advanced Topics
Trade-offs: We choose Availability over Consistency (AP). In a network partition, we prefer to accept metrics even if they can't be queried immediately.
Reliability:
Kafka as a buffer: Protects against ClickHouse maintenance or performance degradation.
Horizontal Scaling: Every component (Gateway, Kafka, Flink, ClickHouse) can scale by adding nodes.
Bottleneck Analysis:
High Cardinality: If a user includes
user_id in tags, ClickHouse index size explodes. Optimization: Implement a cardinality limiter in the Ingestion Gateway that rejects metrics exceeding 10^5 unique tag combinations per hour.Hot Shards: A single metric name (e.g.,
cpu_usage) from 1 million servers can create a hot shard. Solution: Use a composite shard key hash(metric_name, host_id).Security: Data is encrypted at rest (ClickHouse encryption) and in transit (TLS). RBAC is applied at the Query Service level to restrict access to sensitive metrics.
Distinguishing Insights: For massive scale, implement Tiered Storage. Keep the last 7 days of data on NVMe SSDs in ClickHouse and move older data to S3 (using ClickHouse S3 storage engine) to significantly reduce costs while keeping data queryable.