Production-Ready Spark Streaming Part II

Venkatesh Iyer
Split Brain
Published in
3 min readJul 7, 2017

--

In Part I of this series, I talked about ensuring stability of a Spark Streaming job which is the first checklist item to make it production ready, and a few additional points about performance. The next item is Monitoring.

Production readiness checklist #3: Monitoring

I use the term monitoring to refer to both metrics and alerting, because typically we build alerting on metric thresholds.

Pre-reads

When it comes to monitoring Spark Streaming applications, all the content (blogs/videos) revolve around these two root documents:

This still leaves us with two open questions:

  1. How to monitor application metrics? Is there any way to piggyback onto Spark’s metrics registry or do we instantiate our own? If we have our own instance of metrics registry, how do we start a reporter thread for it especially since there is no support to run an initialization function across all drivers and executors to do so (other than a singleton which cannot be parameterized)?
  2. What if we want to write our own Sink, say for InfluxDB? The trait Sink is package private (and again, I don’t want to do any mvn/sbt magic. If an interface is extensible, it should be public)

Solution

Here’s the high level outline of the solution used:

  • Use another instance of metrics registry for application metrics
  • Instantiate the registry and its reporter on each executor — the trick here is to attempt to instantiate the reporter on every invocation of recording a metric (meter, counter, guage etc.), of which the first attempt in every JVM (i.e. in every executor) will do the actual instantiation, and the subsequent attempts will use the existing instance.

Code Snippets

Class encapsulating the Dropwizard registry and InfluxDB Reporter

import com.codahale.metrics.{MetricRegistry, ScheduledReporter}
import com.izettle.metrics.dw.InfluxDbReporterFactory
private[metrics] case class InfluxMetricsReporter(
cfg: Config
){

private[metrics] val registry = new MetricRegistry()

private val reporter: ScheduledReporter = init(cfg, env)

private def builder(
cfg: Config
): InfluxDbReporterFactory = {

val builder = new InfluxDbReporterFactory
builder.setProtocol("https")
builder.setHost(cfg.dbServiceHostName)
builder.setPort(cfg.dbServicePort)
builder.setAuth(s"${cfg.username}:${cfg.password}")
builder.setDatabase(cfg.dbName)
builder.setTags(mapAsJavaMap(cfg.tags))
builder.setFields(ImmutableMap.of(
"timers", ImmutableSet.of(
"count", "min", "max", "mean", "p50", "p99", "m1_rate"
),
"meters", ImmutableSet.of("count", "m1_rate"))
)
builder
}

private def init(
cfg: Config
): ScheduledReporter = {
val reporter = RetryUtil.retry(3) {
builder(cfg, env).build(registry)
}
if (cfg.periodicReporting) {
reporter.start(
cfg.reportingIntervalInMinutes,
TimeUnit.MINUTES
)
}
reporter
}

/**
* Force an immediate flush of metrics to the server.
*/
def report(): Unit = reporter.report()
}

object InfluxMetricsReporter {
private val reporters = new ConcurrentHashMap[Config, InfluxMetricsReporter](1, 1, 1)

private val createReporter = new java.util.function.Function[Config, InfluxMetricsReporter] {
override def apply(
input: Config
): InfluxMetricsReporter = new InfluxMetricsReporter(input)
}

def get(cfg: Config): InfluxMetricsReporter = {
reporters.computeIfAbsent(cfg, createReporter)
}
}

A sample usage of this registry + reporter could be to create a trait to mixin with the streaming job:

trait SparkStreamingMonitor {
def cfg: Config = Config(dbName = "default")
def meter(metricName: String): Meter = {
InfluxMetricsReporter.get(cfg).registry.meter(name)
}
}
object MyStreamingJob extends BaseStreamingJob
with SparkStreamingMonitor {
...
override def cfg = Config(dbName = "prodDB")
...
}

And likewise you can add wrappers around counters and gauges as well. The very first invocation on each executor with make the computeIfAbsent function to instantiate a periodic reporter.

A few more helper methods can make it even easier to use:

trait SparkStreamingMonitor {
...
def meterItem[U: ClassTag](
item: U,
metricName: String,
): U = {
meter(metricName).mark()
item
}
def meterRDD[U: ClassTag](
rdd: RDD[U],
metricName: String,
): Unit = {
meter(metricName).mark(rdd.count())
}
}
object MyStreamingJob extends BaseStreamingJob
with SparkStreamingMonitor {
override def cfg = Config(dbName = "prodDB")
def main(ssc: StreamingContext): Unit = {
...
events
.map(meterItem("input_rate"))
.map(myTransform)
.map(meterItem("processed_input_rate"))
.foreachRDD { r =>
r.persist()
myOutputOperation(r)
meterRDD(r, "output_rate")
r.unpersist(blocking = false)
}
}

You can add similar helpers with counters and gauges to measure errors and other custom metrics.

End-to-End Monitoring System

The metrics emitted from the streaming jobs are sent to InfluxDB, plotted into pretty graphs using Grafana and configured for threshold based alerts using Kapacitor which are dispatched through PagerDuty. How-to’s for these systems are abundantly documented.

Summary

Application level monitoring for Spark Streaming jobs is implemented by using an application specific instance of Dropwizard metrics registry and a reporter to InfluxDB. These are instantiated on every executor using a computeIfAbsent approach.

--

--