Monitoring and Instrumentation

There are several ways to monitor Spark applications: web UIs, metrics, and external instrumentation.

Web Interfaces

Every SparkContext launches a Web UI, by default on port 4040, that displays useful information about the application. This includes:

You can access this interface by simply opening http://<driver-node>:4040 in a web browser. If multiple SparkContexts are running on the same host, they will bind to successive ports beginning with 4040 (4041, 4042, etc).

Note that this information is only available for the duration of the application by default. To view the web UI after the fact, set spark.eventLog.enabled to true before starting the application. This configures Spark to log Spark events that encode the information displayed in the UI to persisted storage.

Viewing After the Fact

It is still possible to construct the UI of an application through Spark’s history server, provided that the application’s event logs exist. You can start the history server by executing:

./sbin/start-history-server.sh

This creates a web interface at http://<server-url>:18080 by default, listing incomplete and completed applications and attempts.

When using the file-system provider class (see spark.history.provider below), the base logging directory must be supplied in the spark.history.fs.logDirectory configuration option, and should contain sub-directories that each represents an application’s event logs.

The spark jobs themselves must be configured to log events, and to log them to the same shared, writable directory. For example, if the server was configured with a log directory of hdfs://namenode/shared/spark-logs, then the client-side options would be:

spark.eventLog.enabled true
spark.eventLog.dir hdfs://namenode/shared/spark-logs

The history server can be configured as follows:

Environment Variables

Environment VariableMeaning
SPARK_DAEMON_MEMORY Memory to allocate to the history server (default: 1g).
SPARK_DAEMON_JAVA_OPTS JVM options for the history server (default: none).
SPARK_DAEMON_CLASSPATH Classpath for the history server (default: none).
SPARK_PUBLIC_DNS The public address for the history server. If this is not set, links to application history may use the internal address of the server, resulting in broken links (default: none).
SPARK_HISTORY_OPTS spark.history.* configuration options for the history server (default: none).

Spark History Server Configuration Options

Security options for the Spark History Server are covered more detail in the Security page.

Property NameDefaultMeaning
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider Name of the class implementing the application history backend. Currently there is only one implementation, provided by Spark, which looks for application logs stored in the file system.
spark.history.fs.logDirectory file:/tmp/spark-events For the filesystem history provider, the URL to the directory containing application event logs to load. This can be a local file:// path, an HDFS path hdfs://namenode/shared/spark-logs or that of an alternative filesystem supported by the Hadoop APIs.
spark.history.fs.update.interval 10s The period at which the filesystem history provider checks for new or updated logs in the log directory. A shorter interval detects new applications faster, at the expense of more server load re-reading updated applications. As soon as an update has completed, listings of the completed and incomplete applications will reflect the changes.
spark.history.retainedApplications 50 The number of applications to retain UI data for in the cache. If this cap is exceeded, then the oldest applications will be removed from the cache. If an application is not in the cache, it will have to be loaded from disk if it is accessed from the UI.
spark.history.ui.maxApplications Int.MaxValue The number of applications to display on the history summary page. Application UIs are still available by accessing their URLs directly even if they are not displayed on the history summary page.
spark.history.ui.port 18080 The port to which the web interface of the history server binds.
spark.history.kerberos.enabled false Indicates whether the history server should use kerberos to login. This is required if the history server is accessing HDFS files on a secure Hadoop cluster. If this is true, it uses the configs spark.history.kerberos.principal and spark.history.kerberos.keytab.
spark.history.kerberos.principal (none) Kerberos principal name for the History Server.
spark.history.kerberos.keytab (none) Location of the kerberos keytab file for the History Server.
spark.history.fs.cleaner.enabled false Specifies whether the History Server should periodically clean up event logs from storage.
spark.history.fs.cleaner.interval 1d How often the filesystem job history cleaner checks for files to delete. Files are deleted if at least one of two conditions holds. First, they're deleted if they're older than spark.history.fs.cleaner.maxAge. They are also deleted if the number of files is more than spark.history.fs.cleaner.maxNum, Spark tries to clean up the completed attempts from the applications based on the order of their oldest attempt time.
spark.history.fs.cleaner.maxAge 7d Job history files older than this will be deleted when the filesystem history cleaner runs.
spark.history.fs.cleaner.maxNum Int.MaxValue The maximum number of files in the event log directory. Spark tries to clean up the completed attempt logs to maintain the log directory under this limit. This should be smaller than the underlying file system limit like `dfs.namenode.fs-limits.max-directory-items` in HDFS.
spark.history.fs.endEventReparseChunkSize 1m How many bytes to parse at the end of log files looking for the end event. This is used to speed up generation of application listings by skipping unnecessary parts of event log files. It can be disabled by setting this config to 0.
spark.history.fs.inProgressOptimization.enabled true Enable optimized handling of in-progress logs. This option may leave finished applications that fail to rename their event logs listed as in-progress.
spark.history.fs.driverlog.cleaner.enabled spark.history.fs.cleaner.enabled Specifies whether the History Server should periodically clean up driver logs from storage.
spark.history.fs.driverlog.cleaner.interval spark.history.fs.cleaner.interval How often the filesystem driver log cleaner checks for files to delete. Files are only deleted if they are older than spark.history.fs.driverlog.cleaner.maxAge
spark.history.fs.driverlog.cleaner.maxAge spark.history.fs.cleaner.maxAge Driver log files older than this will be deleted when the driver log cleaner runs.
spark.history.fs.numReplayThreads 25% of available cores Number of threads that will be used by history server to process event logs.
spark.history.store.maxDiskUsage 10g Maximum disk usage for the local directory where the cache application history information are stored.
spark.history.store.path (none) Local directory where to cache application history data. If set, the history server will store application data on disk instead of keeping it in memory. The data written to disk will be re-used in the event of a history server restart.
spark.history.custom.executor.log.url (none) Specifies custom spark executor log URL for supporting external log service instead of using cluster managers' application log URLs in the history server. Spark will support some path variables via patterns which can vary on cluster manager. Please check the documentation for your cluster manager to see which patterns are supported, if any. This configuration has no effect on a live application, it only affects the history server.

For now, only YARN mode supports this configuration

spark.history.custom.executor.log.url.applyIncompleteApplication false Specifies whether to apply custom spark executor log URL to incomplete applications as well. If executor logs for running applications should be provided as origin log URLs, set this to `false`. Please note that incomplete applications may include applications which didn't shutdown gracefully. Even this is set to `true`, this configuration has no effect on a live application, it only affects the history server.

Note that in all of these UIs, the tables are sortable by clicking their headers, making it easy to identify slow tasks, data skew, etc.

Note

  1. The history server displays both completed and incomplete Spark jobs. If an application makes multiple attempts after failures, the failed attempts will be displayed, as well as any ongoing incomplete attempt or the final successful attempt.

  2. Incomplete applications are only updated intermittently. The time between updates is defined by the interval between checks for changed files (spark.history.fs.update.interval). On larger clusters, the update interval may be set to large values. The way to view a running application is actually to view its own web UI.

  3. Applications which exited without registering themselves as completed will be listed as incomplete —even though they are no longer running. This can happen if an application crashes.

  4. One way to signal the completion of a Spark job is to stop the Spark Context explicitly (sc.stop()), or in Python using the with SparkContext() as sc: construct to handle the Spark Context setup and tear down.

REST API

In addition to viewing the metrics in the UI, they are also available as JSON. This gives developers an easy way to create new visualizations and monitoring tools for Spark. The JSON is available for both running applications, and in the history server. The endpoints are mounted at /api/v1. Eg., for the history server, they would typically be accessible at http://<server-url>:18080/api/v1, and for a running application, at http://localhost:4040/api/v1.

In the API, an application is referenced by its application ID, [app-id]. When running on YARN, each application may have multiple attempts, but there are attempt IDs only for applications in cluster mode, not applications in client mode. Applications in YARN cluster mode can be identified by their [attempt-id]. In the API listed below, when running in YARN cluster mode, [app-id] will actually be [base-app-id]/[attempt-id], where [base-app-id] is the YARN application ID.

EndpointMeaning
/applications A list of all applications.
?status=[completed|running] list only applications in the chosen state.
?minDate=[date] earliest start date/time to list.
?maxDate=[date] latest start date/time to list.
?minEndDate=[date] earliest end date/time to list.
?maxEndDate=[date] latest end date/time to list.
?limit=[limit] limits the number of applications listed.
Examples:
?minDate=2015-02-10
?minDate=2015-02-03T16:42:40.000GMT
?maxDate=2015-02-11T20:41:30.000GMT
?minEndDate=2015-02-12
?minEndDate=2015-02-12T09:15:10.000GMT
?maxEndDate=2015-02-14T16:30:45.000GMT
?limit=10
/applications/[app-id]/jobs A list of all jobs for a given application.
?status=[running|succeeded|failed|unknown] list only jobs in the specific state.
/applications/[app-id]/jobs/[job-id] Details for the given job.
/applications/[app-id]/stages A list of all stages for a given application.
?status=[active|complete|pending|failed] list only stages in the state.
/applications/[app-id]/stages/[stage-id] A list of all attempts for the given stage.
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id] Details for the given stage attempt.
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary Summary metrics of all tasks in the given stage attempt.
?quantiles summarize the metrics with the given quantiles.
Example: ?quantiles=0.01,0.5,0.99
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList A list of all tasks for the given stage attempt.
?offset=[offset]&length=[len] list tasks in the given range.
?sortBy=[runtime|-runtime] sort the tasks.
Example: ?offset=10&length=50&sortBy=runtime
/applications/[app-id]/executors A list of all active executors for the given application.
/applications/[app-id]/executors/[executor-id]/threads Stack traces of all the threads running within the given active executor. Not available via the history server.
/applications/[app-id]/allexecutors A list of all(active and dead) executors for the given application.
/applications/[app-id]/storage/rdd A list of stored RDDs for the given application.
/applications/[app-id]/storage/rdd/[rdd-id] Details for the storage status of a given RDD.
/applications/[base-app-id]/logs Download the event logs for all attempts of the given application as files within a zip file.
/applications/[base-app-id]/[attempt-id]/logs Download the event logs for a specific application attempt as a zip file.
/applications/[app-id]/streaming/statistics Statistics for the streaming context.
/applications/[app-id]/streaming/receivers A list of all streaming receivers.
/applications/[app-id]/streaming/receivers/[stream-id] Details of the given receiver.
/applications/[app-id]/streaming/batches A list of all retained batches.
/applications/[app-id]/streaming/batches/[batch-id] Details of the given batch.
/applications/[app-id]/streaming/batches/[batch-id]/operations A list of all output operations of the given batch.
/applications/[app-id]/streaming/batches/[batch-id]/operations/[outputOp-id] Details of the given operation and given batch.
/applications/[app-id]/environment Environment details of the given application.
/version Get the current spark version.

The number of jobs and stages which can be retrieved is constrained by the same retention mechanism of the standalone Spark UI; "spark.ui.retainedJobs" defines the threshold value triggering garbage collection on jobs, and spark.ui.retainedStages that for stages. Note that the garbage collection takes place on playback: it is possible to retrieve more entries by increasing these values and restarting the history server.

Executor Task Metrics

The REST API exposes the values of the Task Metrics collected by Spark executors with the granularity of task execution. The metrics can be used for performance troubleshooting and workload characterization. A list of the available metrics, with a short description:

Spark Executor Task Metric name Short description
executorRunTime Elapsed time the executor spent running this task. This includes time fetching shuffle data. The value is expressed in milliseconds.
executorCpuTime CPU time the executor spent running this task. This includes time fetching shuffle data. The value is expressed in nanoseconds.
executorDeserializeTime Elapsed time spent to deserialize this task. The value is expressed in milliseconds.
executorDeserializeCpuTime CPU time taken on the executor to deserialize this task. The value is expressed in nanoseconds.
resultSize The number of bytes this task transmitted back to the driver as the TaskResult.
jvmGCTime Elapsed time the JVM spent in garbage collection while executing this task. The value is expressed in milliseconds.
resultSerializationTime Elapsed time spent serializing the task result. The value is expressed in milliseconds.
memoryBytesSpilled The number of in-memory bytes spilled by this task.
diskBytesSpilled The number of on-disk bytes spilled by this task.
peakExecutionMemory Peak memory used by internal data structures created during shuffles, aggregations and joins. The value of this accumulator should be approximately the sum of the peak sizes across all such data structures created in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort.
inputMetrics.* Metrics related to reading data from [[org.apache.spark.rdd.HadoopRDD]] or from persisted data.
    .bytesRead Total number of bytes read.
    .recordsRead Total number of records read.
outputMetrics.* Metrics related to writing data externally (e.g. to a distributed filesystem), defined only in tasks with output.
    .bytesWritten Total number of bytes written
    .recordsWritten Total number of records written
shuffleReadMetrics.* Metrics related to shuffle read operations.
    .recordsRead Number of records read in shuffle operations
    .remoteBlocksFetched Number of remote blocks fetched in shuffle operations
    .localBlocksFetched Number of local (as opposed to read from a remote executor) blocks fetched in shuffle operations
    .totalBlocksFetched Number of blocks fetched in shuffle operations (both local and remote)
    .remoteBytesRead Number of remote bytes read in shuffle operations
    .localBytesRead Number of bytes read in shuffle operations from local disk (as opposed to read from a remote executor)
    .totalBytesRead Number of bytes read in shuffle operations (both local and remote)
    .remoteBytesReadToDisk Number of remote bytes read to disk in shuffle operations. Large blocks are fetched to disk in shuffle read operations, as opposed to being read into memory, which is the default behavior.
    .fetchWaitTime Time the task spent waiting for remote shuffle blocks. This only includes the time blocking on shuffle input data. For instance if block B is being fetched while the task is still not finished processing block A, it is not considered to be blocking on block B. The value is expressed in milliseconds.
shuffleWriteMetrics.* Metrics related to operations writing shuffle data.
    .bytesWritten Number of bytes written in shuffle operations
    .recordsWritten Number of records written in shuffle operations
    .writeTime Time spent blocking on writes to disk or buffer cache. The value is expressed in nanoseconds.

Executor Metrics

Executor-level metrics are sent from each executor to the driver as part of the Heartbeat to describe the performance metrics of Executor itself like JVM heap memory, GC infomation. Metrics peakExecutorMetrics.* are only enabled if spark.eventLog.logStageExecutorMetrics.enabled is true. A list of the available metrics, with a short description:

Executor Level Metric name Short description
totalGCTime Elapsed time the JVM spent in garbage collection summed in this Executor. The value is expressed in milliseconds.
totalInputBytes Total input bytes summed in this Executor.
totalShuffleRead Total shuffer read bytes summed in this Executor.
totalShuffleWrite Total shuffer write bytes summed in this Executor.
maxMemory Total amount of memory available for storage, in bytes.
memoryMetrics.* Current value of memory metrics:
    .usedOnHeapStorageMemory Used on heap memory currently for storage, in bytes.
    .usedOffHeapStorageMemory Used off heap memory currently for storage, in bytes.
    .totalOnHeapStorageMemory Total available on heap memory for storage, in bytes. This amount can vary over time, on the MemoryManager implementation.
    .totalOffHeapStorageMemory Total available off heap memory for storage, in bytes. This amount can vary over time, depending on the MemoryManager implementation.
peakMemoryMetrics.* Peak value of memory (and GC) metrics:
    .JVMHeapMemory Peak memory usage of the heap that is used for object allocation. The heap consists of one or more memory pools. The used and committed size of the returned memory usage is the sum of those values of all heap memory pools whereas the init and max size of the returned memory usage represents the setting of the heap memory which may not be the sum of those of all heap memory pools. The amount of used memory in the returned memory usage is the amount of memory occupied by both live objects and garbage objects that have not been collected, if any.
    .JVMOffHeapMemory Peak memory usage of non-heap memory that is used by the Java virtual machine. The non-heap memory consists of one or more memory pools. The used and committed size of the returned memory usage is the sum of those values of all non-heap memory pools whereas the init and max size of the returned memory usage represents the setting of the non-heap memory which may not be the sum of those of all non-heap memory pools.
    .OnHeapExecutionMemory Peak on heap execution memory in use, in bytes.
    .OffHeapExecutionMemory Peak off heap execution memory in use, in bytes.
    .OnHeapStorageMemory Peak on heap storage memory in use, in bytes.
    .OffHeapStorageMemory Peak off heap storage memory in use, in bytes.
    .OnHeapUnifiedMemory Peak on heap memory (execution and storage).
    .OffHeapUnifiedMemory Peak off heap memory (execution and storage).
    .DirectPoolMemory Peak memory that the JVM is using for direct buffer pool ([[java.lang.management.BufferPoolMXBean]])
    .MappedPoolMemory Peak memory that the JVM is using for mapped buffer pool ([[java.lang.management.BufferPoolMXBean]])
    .ProcessTreeJVMVMemory Virtual memory size in bytes. Enabled if spark.eventLog.logStageExecutorProcessTreeMetrics.enabled is true.
    .ProcessTreeJVMRSSMemory Resident Set Size: number of pages the process has in real memory. This is just the pages which count toward text, data, or stack space. This does not include pages which have not been demand-loaded in, or which are swapped out. Enabled if spark.eventLog.logStageExecutorProcessTreeMetrics.enabled is true.
    .ProcessTreePythonVMemory Virtual memory size for Python in bytes. Enabled if spark.eventLog.logStageExecutorProcessTreeMetrics.enabled is true.
    .ProcessTreePythonRSSMemory Resident Set Size for Python. Enabled if spark.eventLog.logStageExecutorProcessTreeMetrics.enabled is true.
    .ProcessTreeOtherVMemory Virtual memory size for other kind of process in bytes. Enabled if spark.eventLog.logStageExecutorProcessTreeMetrics.enabled is true.
    .ProcessTreeOtherRSSMemory Resident Set Size for other kind of process. Enabled if spark.eventLog.logStageExecutorProcessTreeMetrics.enabled is true.
    .MinorGCCount Total minor GC count. For example, the garbage collector is one of Copy, PS Scavenge, ParNew, G1 Young Generation and so on.
    .MinorGCTime Elapsed total minor GC time. The value is expressed in milliseconds.
    .MajorGCCount Total major GC count. For example, the garbage collector is one of MarkSweepCompact, PS MarkSweep, ConcurrentMarkSweep, G1 Old Generation and so on.
    .MajorGCTime Elapsed total major GC time. The value is expressed in milliseconds.

The computation of RSS and Vmem are based on proc(5)

API Versioning Policy

These endpoints have been strongly versioned to make it easier to develop applications on top. In particular, Spark guarantees:

Note that even when examining the UI of running applications, the applications/[app-id] portion is still required, though there is only one application available. Eg. to see the list of jobs for the running app, you would go to http://localhost:4040/api/v1/applications/[app-id]/jobs. This is to keep the paths consistent in both modes.

Metrics

Spark has a configurable metrics system based on the Dropwizard Metrics Library. This allows users to report Spark metrics to a variety of sinks including HTTP, JMX, and CSV files. The metrics are generated by sources embedded in the Spark code base. They provide instrumentation for specific activities and Spark components. The metrics system is configured via a configuration file that Spark expects to be present at $SPARK_HOME/conf/metrics.properties. A custom file location can be specified via the spark.metrics.conf configuration property. Instead of using the configuration file, a set of configuration parameters with prefix spark.metrics.conf. can be used. By default, the root namespace used for driver or executor metrics is the value of spark.app.id. However, often times, users want to be able to track the metrics across apps for driver and executors, which is hard to do with application ID (i.e. spark.app.id) since it changes with every invocation of the app. For such use cases, a custom namespace can be specified for metrics reporting using spark.metrics.namespace configuration property. If, say, users wanted to set the metrics namespace to the name of the application, they can set the spark.metrics.namespace property to a value like ${spark.app.name}. This value is then expanded appropriately by Spark and is used as the root namespace of the metrics system. Non-driver and executor metrics are never prefixed with spark.app.id, nor does the spark.metrics.namespace property have any such affect on such metrics.

Spark’s metrics are decoupled into different instances corresponding to Spark components. Within each instance, you can configure a set of sinks to which metrics are reported. The following instances are currently supported:

Each instance can report to zero or more sinks. Sinks are contained in the org.apache.spark.metrics.sink package:

Spark also supports a Ganglia sink which is not included in the default build due to licensing restrictions:

To install the GangliaSink you’ll need to perform a custom build of Spark. Note that by embedding this library you will include LGPL-licensed code in your Spark package. For sbt users, set the SPARK_GANGLIA_LGPL environment variable before building. For Maven users, enable the -Pspark-ganglia-lgpl profile. In addition to modifying the cluster’s Spark build user applications will need to link to the spark-ganglia-lgpl artifact.

The syntax of the metrics configuration file and the parameters available for each sink are defined in an example configuration file, $SPARK_HOME/conf/metrics.properties.template.

When using Spark configuration parameters instead of the metrics configuration file, the relevant parameter names are composed by the prefix spark.metrics.conf. followed by the configuration details, i.e. the parameters take the following form: spark.metrics.conf.[instance|*].sink.[sink_name].[parameter_name]. This example shows a list of Spark configuration parameters for a Graphite sink:

"spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink"
"spark.metrics.conf.*.sink.graphite.host"="graphiteEndPoint_hostName>"
"spark.metrics.conf.*.sink.graphite.port"=<graphite_listening_port>
"spark.metrics.conf.*.sink.graphite.period"=10
"spark.metrics.conf.*.sink.graphite.unit"=seconds
"spark.metrics.conf.*.sink.graphite.prefix"="optional_prefix"
"spark.metrics.conf.*.sink.graphite.regex"="optional_regex_to_send_matching_metrics"

Default values of the Spark metrics configuration are as follows:

"*.sink.servlet.class" = "org.apache.spark.metrics.sink.MetricsServlet"
"*.sink.servlet.path" = "/metrics/json"
"master.sink.servlet.path" = "/metrics/master/json"
"applications.sink.servlet.path" = "/metrics/applications/json"

Additional sources can be configured using the metrics configuration file or the configuration parameter spark.metrics.conf.[component_name].source.jvm.class=[source_name]. At present the JVM source is the only available optional source. For example the following configuration parameter activates the JVM source: "spark.metrics.conf.*.source.jvm.class"="org.apache.spark.metrics.source.JvmSource"

List of available metrics providers

Metrics used by Spark are of multiple types: gauge, counter, histogram, meter and timer, see Dropwizard library documentation for details. The following list of components and metrics reports the name and some details about the available metrics, grouped per component instance and source namespace. The most common time of metrics used in Spark instrumentation are gauges and counters. Counters can be recognized as they have the .count suffix. Timers, meters and histograms are annotated in the list, the rest of the list elements are metrics of type gauge. The large majority of metrics are active as soon as their parent component instance is configured, some metrics require also to be enabled via an additional configuration parameter, the details are reported in the list.

Component instance = Driver

This is the component with the largest amount of instrumented metrics

Component instance = Executor

These metrics are exposed by Spark executors. Note, currently they are not available when running in local mode.

Source = JVM Source

Notes:

Component instance = applicationMaster

Note: applies when running on YARN

Component instance = mesos_cluster

Note: applies when running on mesos

Component instance = master

Note: applies when running in Spark standalone as master

Component instance = ApplicationSource

Note: applies when running in Spark standalone as master

Component instance = worker

Note: applies when running in Spark standalone as worker

Component instance = shuffleService

Note: applies to the shuffle service

Advanced Instrumentation

Several external tools can be used to help profile the performance of Spark jobs:

Spark also provides a plugin API so that custom instrumentation code can be added to Spark applications. There are two configuration keys available for loading plugins into Spark:

Both take a comma-separated list of class names that implement the org.apache.spark.api.plugin.SparkPlugin interface. The two names exist so that it’s possible for one list to be placed in the Spark default config file, allowing users to easily add other plugins from the command line without overwriting the config file’s list. Duplicate plugins are ignored.

Distribution of the jar files containing the plugin code is currently not done by Spark. The user or admin should make sure that the jar files are available to Spark applications, for example, by including the plugin jar with the Spark distribution. The exception to this rule is the YARN backend, where the --jars command line option (or equivalent config entry) can be used to make the plugin code available to both executors and cluster-mode drivers.