In this system-design, we will build a multi-component highly-distributed solution for monitoring, alerting, and dashboarding. These are critical services for maintaining high quality, highly available, fault-tolerant software. This solution will need to operate at a massive scale with potentially billions of requests per day. We will leverage a micro-service architecture to handle the very high volume and high availability requirements. For the metrics, we will separate areas of concern between data-collection, data-ingestion, data-processing, alerting, and dashboarding.
Requirements:
- support monitoring large scale service (e.g. billions of requests per day, 100m users per day, 10k+ servers)
- retain queryable data for 1 year
- support variety of metrics including counters, rates, timers, custom metrics, and labels/dimensions
- support for variety of metric granularities from 5 seconds to 1 day
- support for real-time alerting for on-call engineers via multiple channels
- support for custom dashboards and visualizations
- high availability and reliability to monitor and alert during outages
- queries for data within last 12 hours should return in less than 3 seconds (p99)
- queries for data beyond 12 hours should return in less than 15 seconds (p99)
- Software Architecture - Delivery and Taxi Transport Platform with Gig-Economy Marketplace
- Software Architecture - Mapping, Navigation, and ETA System
- Software Architecture - Payments System
- Software Architecture - Hotel Reservation Booking System
- Software Architecture - Translations Service & Clients
High Level Design
Components:
- Data Collection - Agent to collect metrics from hosts
- Data Ingestion - Service to receive and enqueue metrics
- Data Processing and Persistence - Two services for storing and querying metrics; persistence layer
- Alerts - Service for alerting via multiple channels
- Dashboards - Service for dashboard visualizations
Data Collection
Data collection entails the capture and emission of metrics from each client system. This is a client-side component requiring ultra low-latency writing of metrics, batching and aggregation of metric data, and emitting the data to our data ingestion service. For data durability, we will need to balance low-latency in-memory writes versus persisting data for data recovery.
Push vs Pull:
Sending the data from the client hosts to the data ingestion service can happen in one of two ways. Data can be pulled from each client by the collection service (e.g. Prometheus) or a client agent can push data from hosts to the collection service (e.g. CloudWatch, Datadog, New Relic, Graphite, Splunk). Both approaches are well adopted and there are some trade-offs to consider.
Pulling the data requires the ingestion service to maintain a registry of all the client hosts to periodically poll for data. At the scale we are operating at this would require a fleet of servers to horizontally scale polling clients. Each ingestion server would serve a separate group of clients partitioned using a hash function to designate server-to-client mappings. A consistent hashing function would be used to minimize remapping servers-to-clients as additional clients are added and removed over time. Each client host then would expose a standardized endpoint which the ingestion service would poll. This provides the benefit of built-in upness monitoring of client services. A downside of this approach is it requires ingress network access to all hosts from the ingestion service which may be problematic depending on the network topology. It also may not be optimal for short-lived jobs which may not get registered and polled quickly enough, though there are options to workaround this. For low-traffic clients, polling may result in unnecessary additional network traffic (typically TCP) too.
Pushing the data requires clients to know the server endpoints to connect with. Similarly, this would require a fleet of servers to horizontally scale clients pushing data. Given clients are the ones initiating requests we would leverage a load balancer to distribute requests across server hosts. On the client-side an out-of-process agent would collect metrics from that host and push them to the ingestion service. A downside of this approach is that it is trickier to determine when a client host is down or just delayed in pushing metrics (typically UDP). Data authenticity is also a consideration here to prevent bad actors pushing bad data. We will implement the push-model due to its simplicity, broad adoption, and easy compatibility especially with short-lived jobs.
Client Agent:
Data will be collected in-memory, persisted to disk, and then routinely emitted by an out-of-process agent running on each host. This is for several reasons - (1) we want writing metrics to be low-latency and to not block client processing, (2) we do not want to lose significant metric data if the host dies, and (3) given the volume of data and latency incurred across the network we will need to batch these requests to reduce network-volume.
Implementation will vary based on the production client infrastructure. Clients using a VM-based deployment (e.g. EC2) will leverage a separate daemon process whereas clients using a container-based deployment (e.g. Docker Desktop, AWS Fargate, Kubernetes) will leverage a separate container sidecar. Mobile clients will use a background process.
Data Transfer, Batching, and Compression:
To optimize network volume, the client agent will batch and compress metric data into discrete time intervals and/or size-limited batches prior to pushing them up to the data ingestion service. Time-based batching is a common approach but one must consider their data-loss tolerance. If during the batching window the client host goes down, any data not persisted to disk and only in-memory will be lost; further, if the host is deleted, even disk-persisted data will be lost. Therefore a trade-off here is between optimizing performance versus ensuring data durability and recovery. We will go with a 2-second batching of up to 2MB of metrics over the network, though clients can override this if needed. Once the stored metric data hits 2 seconds or exceeds 2MB it will be sent to the ingestion service. Metric data persisted to disk will be rotated every 50 MB to not exhaust host disk space. Data itself will further be compressed (e.g. Gzip) prior to transmission.
Data Ingestion
Ingestion will need to support massive write scale, high availability, and durability. The metrics service going down should not result in loss of data. We will keep the data ingestion scaling independent of the data processing. The ingestion service itself will be very straightforward, simply parsing data, performing basic validation, and enqueueing data for later processing.
Load Balancing and Horizontal Scaling:
The ingestion service will be fronted by a set of load balancers. Each load balancer will be registered under a DNS A record for the client agent to connect to. If one load balancer or availability zone goes down there will be redundancy through round-robin DNS resolution to a healthy load balancer. The load balancers will distribute traffic to a wide fleet of hosts responsible for enqueuing data. Additional hosts are added automatically based on ingestion rate and scale. Likewise the fleet can be scaled-in when throughput is reduced. At smaller scales and with irregular traffic, a serverless compute offering such as AWS Lambda will work well here, as long as cold-starts are kept in control; as the scale increases, which we anticipate, a container-based compute offering such as Kubernetes or AWS Fargate will be more efficient.
Enqueueing Data:
Hosts in the fleet responsible for enqueuing data are quite simple. An upstream edge routing service will handle authentication and authorization, then the ingestion service will validate clients and their data, before then pushing the data into the queue. This enables us to decouple the ingestion of data from the processing and storage of the data. We will want to use a queue which has high-durability with persistence to disk to recover data in the case of in-memory data-loss. Many queueing services are available including Amazon SQS, Kafka, ActiveMQ, RabbitMQ, or even Redis. Kafka is widely popular, offers write-to-disk persistence, and stream processing capabilities so we will integrate with Kafka for the queuing solution. The Kafka queue can be partitioned by client to ensure one client pushing enormous volumes of data does not negatively impact other clients. Each message will have a unique SHA hash-based identifier to support idempotency and prevent duplication.
Data Processing and Persistence
The persistence layer may be the most interesting element of this system-design given the scale of write-volume and requirements on low read-latency. We will need to carefully consider how to optimally store metrics data with regards primarily for read-performance, but also considering write-performance and infrastructure cost.
Dequeuing Data:
With data enqueued in Kafka, a second fleet of hosts dedicated to data processing will dequeue, process, and persist data into our persistence layer. This fleet can scale horizontally as the Kafka queue grows and becomes backed up. We will need to consider how to prevent multiple consumers in this processing-fleet from reading the same data causing improper calculations. Different queueing solutions offer different mechanisms to handle this. Amazon SQS uses “visibility timeouts” where messages in the queue are marked hidden with a timeout when first read; if a second call to dequeue the message is not received prior to the timeout the message becomes visible to all consumers again. In this way consumers read a message, process it, and then dequeue the message; while other consumers do not see the in-process hidden message and rather can read the next message in the queue. If a consumer host goes down, that data is not lost as it’s still in SQS and will become visible to new consumers once the visibility-timeout expires. Kafka on the other hand uses “consumer groups” where each consumer in the same consumer group is mapped to a particular data-partition of the Kafka queue. We will have a single consumer group with many consumers so that each consumer receives a distinct slice of the enqueued data. Consumers may filter events by end-client, metric-name, and geographic-region.
Persisting Data to the Database:
As processing hosts consume metric data from the queue, they process the data into a read-optimized persistence store for clients like the Alerting and Dashboarding services. This is a critical component of the overall design and will have a significant impact on performance.
Data will be accessed typically by a combination of: (1) metric-name, (2) geographic-region, (3) time-window, (4) periodization, and optionally a set of (5) dimensions. The metric-name is self-explanatory, for example “cpu_utilization”. The geographic-region granularity can vary, for now we will use the AWS region of the client, but this could be more or less granular depending on needs. The time-window is the start and end dates and times, whereas the periodization is the granularity of data aggregation (e.g. data points every 1 minute or every 1 hour). Last, the dimensions (e.g. client host-type) are various segmentations of the data, typically with a low cardinality set of values which we will explain in a bit.
Timestream databases are built for use-cases like this and would be highly recommended for a production monitoring service. For this exercise, rather than using an out-of-the-box timestream database like InfluxDB or Amazon Timestream, we will go a bit deeper by using the very popular no-sql DynamoDB and walking through the schema-design. Each end-client will have a separate set of tables.
Metric data under 12 hours tends to be accessed very frequently, whereas the older metric data becomes the less frequently it becomes queried. Therefore we should design our system to be optimized for querying the most-recent metric data.
Variable Periodization and Cold Storage:
To performantly support a wide range of queries from small time windows (e.g. last 1 minute) to very-large time windows (e.g. last 1 year), support for various levels of sampling will be needed (e.g. 5 seconds, 1 minute, 5 minutes, 1 hour, 1 day). Downsampling is the process of converting a high-resolution metric (e.g. 5 seconds) to a lower-resolution metric (e.g. 5 minutes). To keep read-performance high we don’t want to perform this sampling and aggregation of data points into selected periods on the read-path, but instead store the data in an optimized manner for each level of periodization. This increases storage costs but keeps query latency low.
We will maintain separate metric tables for each level of periodization we support: 5 seconds, 1 minute, 5 minutes, 1 hour, 1 day. Different levels of periodization in each table can leverage separate time-to-live (TTL) properties. Higher-resolution metrics will be only kept for 1 week, whereas lower-resolution metrics may be kept for 1 year. This keeps storage-costs and query-latency under control. For example, querying a metric with a 5-second period over a 1 year window would be slow and not very useful with over 6 million data points.
The database will require high availability and therefore will replicate data across multiple database nodes, which DynamoDB will handle internally for us. Further, we can partition our database tables by region, client, and metric as needed for high scalability.
Processing consumers will dequeue metrics and store them in DynamoDB for the highest-resolution metric, then a separate set of workers will aggregate those high-resolution metrics into metrics for each periodization level. Though the processing consumers themselves could potentially write to multiple tables per periodization level, keeping those workers separate enables independent scaling and ensures minimum processing time for the highest-resolution metrics. Another option would be to have separate sets of Kafka processing consumers per periodization-level. Another benefit of having a separate set of hosts for various periodization levels is that it can support moving older metrics data to disk-based or cold-storage such as S3 or AWS Glacier, though this of course impacts read-latency of that older metric data.
Labels and Dimensions:
We will need to support labels/dimensions as well so metrics can be tagged with various attributes. For example: host-type, browser-type, or user-language. Dimensions should have a limited set of possible values (low cardinality). This is due to how dimensions must be stored. Each unique combination of dimensions becomes a unique row and more dimensions may require additional database indexes, which in turn may impact write and/or read-after-write latency. In other words, adding additional dimensions to metrics drastically increases the volume of data and writes. We will discuss how to handle high-cardinality metrics as a bonus later.
Data Model:
Separate tables in different AWS regions will contain data for that geographic-region; we will keep multi-region disaster recovery out-of-scope for this solution and may explore it in a future article. We will also maintain separate tables per periodization level. For example, the base 5-second metric table would have rows every 5 seconds, whereas the 1-hour metric table would have rows every 1 hour. Aggregation processors will aggregate the high-resolution metrics into lower-resolution metrics.
The data model will use a compound key including partition-key and sort-key. The partition-key is made up of the metric name, whereas the sort-key is made up of the timestamp. Queries with dimensions may be supported with secondary indexes (more performant querying, more expensive) and/or using query filters (far less performant querying, cheaper). Together this allows for querying by (1) metric name, (2) metric name + time range, or (3) metric name + time range + dimensions. Together the metric name, timestamp, and dimensions uniquely identify a row in the table per region and periodization. This schema allows for optimized queries without full-page scans which are highly discouraged in DynamoDB.
Querying Metrics:
As our two client services need to query the service they will use a common data access service to read metrics data. The access-pattern will take a (1) metric-name, (2) geographic-region, (3) time-window, (4) periodization, and optionally a set of (5) dimensions. Since metrics have been precomputed into various levels of periodization and stored in an optimal manner, queries will be inexpensive and quick.
Bonus - High Cardinality Metrics:
High-cardinality labeled metrics are tricky given scalability and storage limitations. A dimension with 1 million unique values (e.g. user-id, ip-address) would result in an exponential number of rows which doesn’t scale. This is a more general computer-science challenge known as the “count-distinct problem”. To achieve high-cardinality labeled metrics, we can use a class of probabilistic algorithms to estimate distinct counts. A common algorithm and one used in Redis and Amazon Redshift in recent years is Hyper-Log-Log (HLL), published in 2007. A follow-up post will dive deeper into Hyper-Log-Log, but for now HLL works high-level by hashing each value, calculating the leading number of zeros in the binary representation of each hashed value in the set, and then taking the harmonic-mean of these estimates. A great write-up of this algorithm can be read here.
Alerts
Alerts will require a UI for configuring them, persistence to store the alert configurations, and a service to periodically check metrics against alert thresholds. The frontend could be server-side rendered or a single-page-application with frontend-based routing. In either case the Alerts service will leverage the Query API to get metrics data at low-latency. These calls will happen frequently up to every 1 second as defined per alarm/alert/monitor. Horizontal scaling is crucial here at all layers. After making the metrics query the Alerts service will determine which monitors are breached to send alerts for. Related breached monitors may be merged into a single alert. The service then sends alerts through a configurable output channel. Channels may include email, SMS, push notifications, and/or service-to-service events.
Dashboards
Dashboards will require a UI for configuring them, persistence to store the dashboard configurations, and a service to serve the dashboards. Like the Alerts, the frontend could be server-side rendered or a single-page-application with frontend-based routing. In either case the Dashboard service will leverage the Query API to get metrics data at low-latency. These calls will happen far less frequently than the Alerts with the exception of real time dashboards. For real time dashboards, the call frequency will be much higher and therefore costlier. The frontend React app will leverage frontend graphing libraries such as D3 to chart metrics and render dashboards.
Conclusion
Here, we’ve covered high-level how to build a large-scale, highly-available, and fault-tolerant solution for monitoring and software observability. We’ve discussed various complexities in data deduplication, minimizing data volume, ensuring data recovery, horizontal scaling, and storage optimizations. Now, next time you use your off-the-shelf monitoring solution you will have a more in-depth understanding of how that solution is working internally. Check back for additional system-design write-ups.