Editor’s note: Hadoop uses the terms “master” to describe its architecture and certain metric names. Datadog does not use this term. Within this blog post, we will refer to this term as “leader node”, except for the sake of clarity in instances where we must reference a specific metric name.
This post is part 2 of a 4-part series on monitoring Hadoop health and performance. read our guide to Hadoop architecture, you know about the components that make up a typical Hadoop cluster. In this post, we’ll dive deep into each of the technologies introduced in that guide and explore the key metrics exposed by Hadoop that you should keep your eye on.
Service-oriented monitoring
From an operations perspective, Hadoop clusters are incredibly resilient in the face of system failures. Hadoop was designed with failure in mind and can tolerate entire racks going down.
Monitoring Hadoop requires a different mindset than monitoring something like generally don’t care if a single worker node fails, although failure of a leader node such as one of your NameNodes will require fairly rapid remediation. Generally speaking, individual host-level indicators are less important than service-level metrics when it comes to Hadoop.
Key Hadoop performance metrics to monitor
When working properly, a Hadoop cluster can handle a truly massive amount of data—there are plenty of production clusters managing petabytes of data each. Monitoring each of Hadoop’s sub-components is essential to keep jobs running and the cluster humming.
Hadoop metrics can be broken down into four broad categories:
To learn more about collecting Hadoop and ZooKeeper metrics, take a look at NameNode (and vice versa) via a periodic heartbeat (by default every three seconds, set by dfs.heartbeat.interval
). The heartbeat contains (amongst other things) health information about the DataNode. It is through this mechanism that the NameNode is able to know the state of all the DataNodes in the cluster.
Failure of the Metric TypeCapacityRemainingAvailable capacityResource: UtilizationCorruptBlocks/MissingBlocksNumber of corrupt/missing blocksResource: Error/Resource: AvailabilityVolumeFailuresTotalNumber of failed volumesResource: ErrorNumLiveDataNodes/NumDeadDataNodesCount of alive DataNodes/Count of dead DataNodesResource: AvailabilityFilesTotalTotal count of files tracked by the NameNodeResource: UtilizationTotalLoadMeasure of file access across all DataNodesResource: UtilizationBlockCapacity/BlocksTotalMaximum number of blocks allocable/Count of blocks tracked by NameNodeResource: UtilizationUnderReplicatedBlocksCount of under-replicated blocksResource: AvailabilityNumStaleDataNodesCount of stale DataNodesResource: Availability
Metric to alert on: CapacityRemaining
CapacityRemaining represents the total available capacity remaining across the entire HDFS cluster. It is an understatement to say that running out of disk space in HDFS will cause BadThingsToHappen™. DataNodes that are out of space are likely to fail on boot, and of course, running DataNodes will not be able to be written to. In addition, any running jobs which write out temporary data may fail due to lack of capacity. It is good practice to ensure that disk use never exceeds 80 percent capacity.
Thankfully, HDFS the more memory required by NameNode. The rule of thumb for NameNode memory requirements is that each object (file, directory, block) tracked by the NameNode consumes roughly 150 bytes of memory. If each file is mapped to one block (files often span multiple blocks), that is approximately 300 bytes per file. That doesn’t sound like much until you factor in the replication of data: the default replication factor is 3 (so 900 bytes per file), and each replica adds an additional 16 bytes, for a total of ~1 KB of metadata per file. That still may not sound like much, but as your cluster expands to include millions of files, you should keep this number in mind and plan to add additional memory to your NameNode.
TotalLoad is the current number of concurrent file accesses (read/write) across all DataNodes. This metric is not usually indicative of a problem, though it can illuminate issues related to job execution. Because worker nodes running the DataNode daemon also perform MapReduce tasks, extended periods of high I/O (indicated by high TotalLoad) generally translate to degraded job execution performance. Tracking TotalLoad over time can help get to the bottom of job performance issues.
BlockCapacity/BlocksTotal: Like FilesTotal, keeping an eye on the total number of blocks across the cluster is essential to continued operation. Block capacity and file capacity are very closely related but not the same (files can span multiple blocks); monitoring both block and file capacity is necessary to ensure uninterrupted operation. If your cluster is approaching the limits of block capacity, you can easily add more by bringing up a new DataNode and adding it to the pool, or adding more disks to existing nodes. Once added, you can use the per-file basis. The default replication factor is three, meaning that each block will be stored on three DataNodes. If you see a large, sudden spike in the number of under-replicated blocks, it is likely that a DataNode has died—this can be verified by correlating under-replicated block metrics with the status of DataNodes (via NumLiveDataNodes and related metrics).
NumStaleDataNodes: Every three seconds (by default), DataNodes send a heartbeat message to the NameNode. This heartbeat acts as a health-check mechanism but can also be used by the NameNode to delegate new work. As explained above, DataNodes that do not send a heartbeat within 30 seconds are marked as “stale.” A DataNode might not emit a heartbeat for a number of reasons—network issues or high load are among the more common causes. Unless you’ve disabled reads or writes to stale DataNodes, you should definitely keep an eye on this metric. A DataNode that did not emit a heartbeat due to high system load (or a slow network) will likely cause any reads or writes to that DataNode to be delayed as well. However, in practice, a stale DataNode is likely dead.
Even if reading and writing on stale DataNodes is disabled, you should still monitor for sudden increases in the number of stale DataNodes, because in a short time stale nodes can easily become dead nodes. And stale DataNodes will be written to regardless of your settings if the ratio of stale DataNodes to total DataNodes exceeds the value of dfs.namenode.write.stale.datanode.ratio
.
NameNode JVM metrics
Because the NameNode runs in the Java Virtual Machine (JVM), it relies on Java garbage collection processes to free up memory. The more activity in your HDFS cluster, the more often the garbage collection will run.
Anyone familiar with Java applications knows that garbage collection can come with a high performance cost. See Metric TypeConcurrentMarkSweep countNumber of old-generation collectionsOtherConcurrentMarkSweep timeElapsed time of old-generation collections, in millisecondsOther
If CMS is taking more than a few seconds to complete, or is occurring with increased frequency, your NameNode may not have enough memory allocated to the JVM to function efficiently.
DataNode metrics
DataNode metrics are host-level metrics specific to a particular DataNode.
Name | Description | Metric Type |
---|---|---|
Remaining | Remaining disk space | Resource: Utilization |
NumFailedVolumes | Number of failed storage volumes | Resource: Error |
Remaining: As mentioned in the section on NameNode metrics, running out of disk space can cause a number of problems in a cluster. If left unrectified, a single DataNode running out of space could quickly cascade into failures across the entire cluster as data is written to an increasingly-shrinking pool of available DataNodes. Tracking this metric over time is essential to maintain a healthy cluster; you may want to alert on this metric when the remaining space falls dangerously low (less than 10 percent).
NumFailedVolumes: By default, a single volume failing on a DataNode will cause the entire node to go offline. Depending on your environment, that may not be desired, especially since most DataNodes operate with multiple disks. If you haven’t set dfs.datanode.failed.volumes.tolerated
in your HDFS configuration, you should definitely track this metric. Remember, when a DataNode is taken offline, the NameNode must copy any under-replicated blocks that were lost on that node, causing a burst in network traffic and potential performance degradation. To prevent a slew of network activity from the failure of a single disk (if your use case permits), you should set the tolerated level of volume failure in your config like so:
<property>
<name>dfs.datanode.failed.volumes.tolerated</name>
<value>N</value>
</property>
where N is the number of failures a DataNode should tolerate before shutting down.
Don’t forget to restart your DataNode process following a configuration change.
The downside to enabling this feature is that, with fewer disks to write to, a DataNode’s performance may suffer. Setting a reasonable value for tolerated failures will vary depending on your use case.
MapReduce counters
The MapReduce framework exposes a number of counters to track statistics on MapReduce job execution. Counters are an invaluable mechanism that let you see what is actually happening during a MapReduce job run. They provide a count of the number of times an event occurred. MapReduce counters can be broken down into four categories:
- job counters
- task counters
- custom counters
- file system counters
MapReduce application developers may want to track the following counters to to find bottlenecks and potential optimizations in their applications.
Job counters
Group name:org.apache.hadoop.mapreduce.JobCounter
Counter Name | Description | combine step. SPILLED_RECORDS: Map output data is stored in memory, but when the buffer fills up data is dumped to disk. A separate thread is spawned to merge all of the spilled data into a single larger sorted file for reducers. Spills to disk should be minimized, as more than one spill will result in an increase in disk activity as the merging thread has to read and write data to disk. Tracking this metric can help determine if configuration changes need to be made (like increasing the memory available for map tasks) or if additional processing (compression) of the input data is necessary. GC_TIME_MILLIS: Useful for determining the percentage of time tasks spent performing garbage collection. See the section on MILLIS_MAPS/MILLIS_REDUCES for more information. Custom countersMapReduce allows users to implement counting the number of malformed or missing records. Keep in mind, however, that since all counters reside in the JobTracker JVM’s memory, there is a practical limit to the number of counters you should use, limited by the amount of physical memory available. (Hadoop has a configuration value, YARN metricsYARN metrics can be grouped into three distinct categories: Cluster metricsCluster metrics provide a high-level view of YARN application execution. The metrics listed here are aggregated cluster-wide.
containersFailed tracks the number of containers that failed to launch on that particular NodeManager. heap is set too low, containers won’t launch. You don’t need to be alerted for every container that failed to launch, but if a single node has many failed containers, it may be time to investigate. ZooKeeper metricsZooKeeper plays an important role in a Hadoop deployment. It is responsible for ensuring the availability of the Metric Type | ||||||
---|---|---|---|---|---|---|---|---|
zk_followers | Number of active followers | Resource: Availability | ||||||
zk_avg_latency | Amount of time it takes to respond to a client request (in ms) | Work: Performance | ||||||
zk_num_alive_connections | Number of clients connected to ZooKeeper | Resource: Availability |
Metric to alert on: zk_followers (leader only)
The number of followers should equal the total size of your ZooKeeper ensemble, minus 1 (the leader is not included in the follower count). If the ensemble fails to maintain quorum, all automatic failover features are suspended. Changes to this value should be alerted on, as the size of your ensemble should only change due to user intervention (e.g., an administrator decommissioned or commissioned a node).
zk_avg_latency: The average request latency is the average time it takes (in milliseconds) for ZooKeeper to respond to a request. ZooKeeper will not respond to a request until it has written the transaction to its transaction log. Any latency spikes could delay the transition between servers in the event of a ResourceManager or NameNode failure. Of course, such events are rare, but by tracking the average latency over time you can better correlate errors that may have been caused by stale data.
zk_num_alive_connections: ZooKeeper reports the number of connected clients via this metric. This represents all connections, including connections from non-ZooKeeper nodes. In most environments, this number should remain fairly static—generally, your number of NameNodes, ResourceManagers, and their respective standbys should remain relatively stable. You should be aware of unanticipated drops in this value; since Hadoop uses ZooKeeper to provide high availability, a loss of connection to ZooKeeper could make the cluster less resilient to node loss.
Go forth, and collect!
In this post we’ve explored many of the key metrics you should monitor to keep tabs on the health and performance of your Hadoop cluster.
The metrics covered in this post pertain to the core Hadoop components and their typical use. Given Hadoop’s vast ecosystem, over time you will likely identify additional metrics that are particularly relevant to your specific Hadoop infrastructure, and its users.