The Question
DesignGlobal Large-Scale Log Aggregation System
Design a highly available and cost-effective log collection system for a global infrastructure spanning 10 data centers with over 1 million servers. The system must handle a combined throughput of 10 PB of log data per day, ensuring logs are searchable within one minute while providing long-term archival. Address challenges regarding cross-region network latency, data durability during partitions, and the cost trade-offs between real-time indexing and cold storage.
Kafka
OpenSearch
S3
Vector
Fluent Bit
Go
gRPC
Zstandard
Athena
TLS 1.3
Questions & Insights
Clarifying Questions
Scale and Volume: What is the total number of servers across all 10 data centers, and what is the average/peak log volume (GB/day or messages/second) per server?
Retention & Latency: How long should logs be searchable (hot storage) vs. archived (cold storage), and what is the target E2E latency (from log generation to searchability)?
Data Shape: Are logs structured (JSON), semi-structured, or raw text? Are there specific PII/compliance requirements for scrubbing data before it leaves the region?
Availability vs. Durability: Is it acceptable to lose logs during a catastrophic network partition, or is "at-least-once" delivery a hard requirement?
Assumptions:
Scale: 1,000,000 servers across 10 DCs (~100k per DC).
Volume: 10 GB per server per day = 10 PB/day total.
Latency: Logs should be searchable within 1 minute.
Retention: 7 days hot (searchable), 30 days cold (archived).
Thinking Process
The Global Ingress Bottleneck: How do we prevent cross-continent network latency and egress costs from killing the ingestion performance?
Regional Buffering: Why is a local message bus (Kafka) necessary in every DC to handle bursts and network partitions?
Tiered Storage Strategy: How do we manage the 10 PB/day scale without breaking the bank? (Decoupling raw storage from indexing).
End-to-End Delivery: How do we ensure exactly-once or at-least-once delivery from the agent to the central search engine?
Bonus Points
Log Vectorization/Sampling: Using "Vector" or "Fluent Bit" at the edge to perform dynamic sampling of "INFO" logs while keeping 100% of "ERROR" logs during high-load periods to save costs.
Backpressure Propagation: Implementing a feedback loop where the ingestion layer can signal the agents to slow down or drop low-priority logs if the storage layer is lagging.
Delta Lake/Parquet Optimization: Storing raw logs in Parquet format on S3 with Zstandard compression to achieve 5x-10x storage cost reduction compared to raw text.
Geo-Aware Routing: Using Anycast DNS to route agent traffic to the closest regional collector automatically.
Design Breakdown
Functional Requirements
Core Use Cases:
Collect logs from heterogeneous servers (Linux/K8s).
Transport logs from 10 global DCs to a central analytics hub.
Provide a searchable interface for real-time debugging.
Scope Control:
In-scope: Log ingestion, regional aggregation, transformation/masking, and long-term storage.
Out-of-scope: Building a custom visualization UI (assume Grafana/Kibana), sophisticated anomaly detection AI.
Non-Functional Requirements
Scale: 10 PB/day, 1M+ agents, millions of events per second.
Latency: Ingestion to search visibility < 60 seconds.
Availability: 99.99% for ingestion (logs must be accepted even if search is down).
Consistency: At-least-once delivery (no data loss).
Security: Mutual TLS (mTLS) for agent-to-collector communication; encryption at rest.
Estimation
Traffic:
10 PB / 86400s \approx 115 GB/s aggregate write throughput.
Average Message Size: 500 bytes.
Total QPS: 115 GB / 500 bytes \approx 230 Million Events/sec.
Storage:
7 Days Hot Searchable: 70 PB (Requires massive indexing clusters).
30 Days Cold: 300 PB (S3 Standard-IA or Glacier Instant Retrieval).
Bandwidth:
DC Egress: ~11.5 GB/s per data center.
Blueprint
Concise Summary: A tiered global collection pipeline using lightweight agents at the edge, regional Kafka clusters for buffering, and a central ingestion service that routes data to OpenSearch (Hot) and S3 (Cold).
Major Components:
Log Agent: Lightweight sidecar (Vector/Fluent Bit) on every server for collection and local compression.
Regional Message Bus: Kafka cluster in each of the 10 DCs to provide 4-hour local retention and absorb network spikes.
Aggregator Service: Regional workers that parse, mask PII, and batch logs for cross-region transport.
Global Storage Hub: Centralized S3 bucket for raw logs and OpenSearch cluster for indexed search.
Simplicity Audit: This design avoids complex global consensus. By using regional buffers, we decouple DC failure domains and optimize cross-region bandwidth through batching and compression.
Architecture Decision Rationale:
Why this?: Regional buffering is the only way to handle 10 global DCs reliably. Direct writes to a central hub would fail due to speed-of-light constraints and network instability.
Functional Satisfaction: Meets the collection and search requirements through a clear split between raw storage and indexing.
Non-functional Satisfaction: High availability via Kafka's replication and S3's durability; horizontal scaling via consumer groups.
High Level Architecture
Sub-system Deep Dive
Edge (Optional)
Content Delivery & Traffic Routing: Agents use a static regional endpoint or DNS-based discovery (e.g.,
collector.dc1.example.com) to ensure logs stay within the DC for the first hop.Security & Perimeter:
API Gateway: Each DC has a local L7 Load Balancer terminating TLS 1.3.
Rate Limiting: Per-server limits to prevent a single "noisy" service from overwhelming the regional Kafka.
Authentication: Agents use short-lived JWTs or mTLS certificates.
Service
Topology & Scaling:
Aggregators: Stateless Go-based services deployed in K8s. Scaled horizontally based on Kafka consumer lag.
Isolation: Each DC is a separate failure domain. A failure in DC-1 Kafka does not affect DC-2.
API Schema Design:
Endpoint:
POST /v1/logs/ingestProtocol: gRPC (for high-throughput binary serialization) or REST/JSON.
Request:
{ "timestamp": int64, "metadata": map, "message": string, "level": enum }Idempotency: Agents attach a unique UUID per batch to prevent duplicates during retries.
Resilience:
Retry Policy: Agents use exponential backoff with jitter if the regional LB is unreachable.
Graceful Degradation: If regional Kafka is full, agents can spill logs to local disk (limited buffer).
Storage
Access Pattern: Write-heavy (115 GB/s). Reads are sparse (debugging/auditing) but require low latency for the last 15 minutes of data.
Database Table Design:
S3 Hierarchy:
s3://logs/tenant/year/month/day/hour/region_host_uuid.parquetOpenSearch Index: Daily rolling indices.
Technical Selection:
S3: Ultimate durability and cost-effective scaling for 10 PB/day.
OpenSearch: Distributed search engine for hot data.
Distribution Logic:
Sharding: OpenSearch indices are sharded by
service_id or host_id to avoid hot shards during specific incident investigations.Reliability: S3 provides 99.999999999% durability. OpenSearch is configured with 1 replica across availability zones.
Cache
Purpose: Metadata Cache (Redis) stores the schema and mapping for different log types to speed up the Indexing Service.
Key-Value Schema:
schema:{service_name} -> {JSON_mapping_properties}.Technical Selection: Redis (Cluster mode).
Failure Handling: If Redis is down, the Indexing Service falls back to a default "raw-text" index to ensure ingestion doesn't stop.
Messaging
Purpose: Regional Kafka decouples the unpredictable log generation from the cross-region WAN transport.
Throughput & Partitioning:
Partition key:
host_id or trace_id to ensure ordering for a specific server's logs.Technical Selection: Kafka (self-managed or Confluent) for high throughput and disk-backed persistence.
Data Processing
Processing Model: Streaming via the Aggregator Service.
Processing DAG:
Consume: Read from Regional Kafka.
Transform: Parse Regex/Grok patterns into JSON.
Enrich: Add DC-location, Environment, and Instance-Type tags.
Mask: Redact patterns matching credit cards or emails.
Batch/Compress: Gzip/Zstd batches for WAN transport.
Technical Selection: Custom Go Service for high-performance concurrent processing. Spark is avoided (YAGNI) as complex stateful windowing isn't needed for simple log collection.
Infrastructure (Optional)
Observability: Prometheus metrics for "logs_received_total" vs "logs_indexed_total" to monitor data loss.
Distributed Coordination: ZooKeeper/KRaft for Kafka cluster management.
Wrap Up
Advanced Topics
Trade-offs: We prioritize Availability (A) and Partition Tolerance (P) over Consistency (C). In a network split, logs are buffered regionally. Duplicate logs may occur (at-least-once) but data loss is minimized.
Reliability: Use of Dead Letter Queues (DLQ) for logs that fail parsing (e.g., malformed JSON) so they don't block the pipeline.
Bottleneck Analysis:
Search Index Size: Indexing 10 PB/day in OpenSearch is prohibitively expensive.
Optimization: Implement Log Triage. Index only "ERROR" and "WARN" logs. For "INFO" logs, store them in S3 and use S3 Select or Athena for on-demand searching to save millions in infrastructure costs.
Security: Data is encrypted using TLS 1.3 in transit and AES-256 at rest in S3.
Distinguishing Insights: At this scale, the "Index Everything" approach fails. A Staff Engineer would suggest Index-on-Demand or Hydration. Store everything in S3; if a dev needs to search "INFO" logs from 2 hours ago, the system "hydrates" (ingests) just that specific slice into the search engine.