Production-Ready Spark Streaming Part I

Venkatesh Iyer
Split Brain
Published in
12 min readJul 5, 2017

--

One of the projects I work on is a Near Real Time events ingestion and processing pipeline built using Spark Streaming. It took quite an effort to put all the bells and whistles in place to make it production ready, which I’ll talk about in this post.

TL;DR: There’s no TLDR. This is a longish post intended to serve as my personal notes as well as a how-to guide for anyone attempting similar stuff.

System Overview

The data infrastructure at Thumbtack is built on Google Cloud Platform. Events from various services are emitted to a Fluentd service, which acts as a router that forwards those messages to multiple destinations. In this case, events are sent to Google Pubsub topics for stream processing and AWS S3 for batch processing & backup (use of S3 is a relic of a previous generation of data infrastructure at Thumbtack, and continuing to tee off data there gave us a very robust migration path. It may eventually be deprecated in favor of GCS)

Spark streaming jobs are run on Google Dataproc clusters, which provides a managed Hadoop + Spark instance. And since it needs to pull in events from Google Pubsub, we use a custom receiver implementation.

Pre-reads

Spark Streaming documentation lays out the necessary configuration for running a fault tolerant streaming job. There are several talks / videos from the authors themselves on this topic. These basic guidelines — enabling Write Ahead Logs (WAL), enabling checkpointing, ensuring incoming messages are stored in the WAL before acknowledging them to the source — are all followed to the dot.

There are a few other blog posts that came in very handy with some Spark / Yarn tuning suggestions. I’ve borrowed some of the tips and tweaks from them.

Since we run on Google Dataproc, the problems with hardware, HDFS, Yarn et al. is abstracted out. The realm of failures here is in the streaming application + network services domain.

Production readiness checklist #1: Stability

Before tuning the jobs for performance (latency or throughput) it is important to have at least one set of configuration values that ensures the stability of a long running streaming job. Stability essentially translates into two factors:

  1. Have the batch processing time less than the batch interval: or else input data will keep getting queued up in the WAL, making the recovery from checkpoint on a restart extremely slow. And if there’s a code change needed in this period, you will have to throw away the checkpoint data and recover this hole in data by some other way.
  2. Handle failures: catch all exceptions in receivers and output operations, retry sufficient number of times and if still failing, give up deterministically and page somebody (or move on, but still page somebody).

Tuning for lower processing time

Spark Streaming is a micro-batching framework, where the batch interval can be specified at the time of creating the streaming context:

val ssc = new StreamingContext(conf, Seconds(x))

which means a new batch will be created every x seconds. The parallelism for each batch is governed by the configuration setting:

spark.streaming.blockInterval = 200ms (default)

It is possible that by playing around with different values for these two settings, you may find a combination that ensures that a batch is processed before the next one arrives.

Or you may not.

So now we have to try out a few options to speed up the processing of a single batch. Here are a few tunings that helped in my case:

  1. Process multiple batches concurrently by setting:
spark.streaming.concurrentJobs = n

Note that if you care about ordering of messages you cannot use this. In our case, we don’t care about ordering (we couldn’t have used Google Pubsub either if we did)

This straightaway gives a processing time headroom by a factor of n.

2. Process multiple partitions of a batch in parallel by using foreachPartitionAsync. Spark cluster scheduling needs to run in FAIR mode for this to take effect:

spark.scheduler.mode = FAIR

This can be useful for applications that care for ordering, provided the ordering requirement is partial (say only updates for a given key need to be ordered, updates to different keys can happen in any order)

Dataproc note: this setting has to be applied while spinning up the cluster, and not while submitting a job.

3. Even with one of the above two settings, you may find that tasks are asymmetrically queued up on a couple of hot executors while the others sit idle. This is because Spark is tuned to favor data locality. By default, Spark waits for 3s before moving the processing from process-local to data-local to rack-local.

This is suitable for massive batch processing but for streaming, the data in each batch is small enough that it is inexpensive to move it around; and the long tail latency will almost always be from the output operation. Disable data locality settings using:

spark.locality.wait = 0s

4. So now we get to state that for a calibrated input rate, the job remains stable. But what if there is a sudden surge in input rate: the batch interval and block interval remains the same, there are just a LOT more messages per interval per partition to process?

Spark Streaming can limit the receiver rate with this setting:

spark.streaming.receiver.maxRate = 3000

What if the output operations are temporarily slower than usual? Spark can use the signals from output operations and increase/reduce the receiver rate to provide “backpressure”, which can be enabled using the setting:

spark.streaming.backpressure.enabled = true

Awesome!

But …

If you’re implementing a custom receiver, you can either get “reliability” (durability, to be precise) or rate-limiting, but not both.

Ok, let’s implement our own Block Generator then and control the rate there (which is what the Kafka receiver does).

But …

The function to register a custom block generator is in a package private class in org.apache.spark.streaming (and I do not want to do any mvn/sbt magic to extend a private API).

So I ended up with a static rate limiter which limits the rate at which each receiver receives messages from Pubsub. There is unfortunately no backpressure feature in my implementation, because there is no public interface to communicate a modified rate to the receiver from either the driver or other executors. See this post for the receiver implementation details.

(NOTE: If you’re using Kafka receiver or the new Direct Stream approach, you get both rate limiting and backpressure)

Handling Failures

First response to failure: retry.

First set of retries should be done in the application code itself. Let’s say that an output operation is retried 3 times, after which the exception is thrown upstream.

Spark retries a task (i.e. output operation on one partition of a batch) 4 times by default. This can be increased with the setting:

spark.task.maxFailures = 6

(so in effect it’s retried 18 times now)

When a task fails maxFailures times, the job (processing of a single batch) is aborted.

In streaming jobs, there can be two approaches towards handling failures:

  1. Log and alert on failure of all retries, optionally backup the failed batch, and keep running (e.g. metrics aggregation jobs).
  2. Log and alert on failure of all retries, and abort the application (e.g. streaming ETL jobs)

There is actually a 3rd approach - retry infinitely, but in this case it is difficult to figure out when to escalate for external intervention. Also with no backpressure, the receiver might as well continue receiving data and queuing it up in the WAL, which means the recovery can get complicated (especially if the failure was due to a bug and now a code change is necessary). It is a lot simpler to just have the backlog build in the messaging application, be it Kafka or Google Pubsub. If the data pipeline has to be stalled until a batch is successful, I’d argue that option #2 is a better approach.

Option #1 is how Spark Streaming behaves by default: the batch associated with a failed job is discarded, and the application moves on. Backup of the failed batch, if desired, needs to be handled within the task using a recoverWith block.

To implement option #2 we need to call stop() on the StreamingContext in the driver (stopping StreamingContext on executors does not throw any exceptions, but does not take effect. Haven’t grokked the Spark code yet to see why)

One important thing to know is that the code within foreachRDD executes as a mini-driver program. So as long as the exceptions are propagated to this level, it can be caught:

events
.foreachRDD { r =>
val future = foreachPartitionAsync { p =>
outputOperation(p.toList)
}
val error = Await.result(future, Duration.Inf)
if (error.nonEmpty) {
logger.error("oh no")
ssc.stop()
}
}

This still doesn’t work. If you’ve enabled checkpointing, Spark will crib that it cannot serialize ssc, even if you mark it as transient. Thankfully there is a static factory method to get around that:

Replace
ssc.stop()
with
StreamingContext.getActive().get.stop()

Ok, so we’ve covered the failure of output operations. But what happens when the receiver fails? Even if you throw an exception from the receiver thread and terminate it, Spark ReceiverTracker will restart it, infinitely.

Now this can be argued to be a desirable behavior: if the input source is having trouble, keep retrying until it is available again. IMHO this should be done within the receiver loop itself, without crashing that thread. Instead, when the receiver thread crashes, it should be indicative of something seriously wrong, like not able to commit to WAL anymore. And in that case, I want to signal the driver to abort the application, page the on-call and investigate what went wrong.

But …

There is no direct way to control the lifecycle of a receiver. There is a StreamingListenerDeveloperApi” which can be leveraged here though.

class StreamingErrorHandler extends StreamingListener {
/** Called when a receiver has reported an error */
override def onReceiverError(
receiverError: StreamingListenerReceiverError
): Unit = {
StreamingContext.getActive().get.stop()
}
}

This doesn’t work either, because Spark prohibits calling StreamingContext.stop() in the listener thread. So that call needs to be delegated to another thread.

And now that we have a listener to handle receiver failures, we can leverage the same for output operation failures as well, instead of having two different places to call ssc.stop() from. Here’s how the listener code looks like after those changes:

// streaming listener
class StreamingErrorHandler extends StreamingListener {
@volatile private var shouldAbortFlag: Boolean = false
val
logger: Logger = LoggerFactory.getLogger(this.getClass)

/** Called when a receiver has reported an error */
override def onReceiverError(
e: StreamingListenerReceiverError
): Unit = {
logger.error(s"Rcvr error: ${e.receiverInfo.lastErrorMessage}")
this.shouldAbortFlag = true
}

/** Called when processing of a batch of jobs has completed. */
override def onBatchCompleted(
batchCompleted: StreamingListenerBatchCompleted
): Unit = {
batchCompleted
.batchInfo
.outputOperationInfos
.values
.foreach { ooi =>
if (ooi.failureReason.nonEmpty) {
logger.error(s"Batch failed with error: ${ooi.failureReason.get}")
this.shouldAbortFlag = true
}
}
}

def shouldAbort: Boolean = shouldAbortFlag
}

The main class should have these methods:

// main: recover streaming context from checkpoint if it exists,
// else create a new one
def
main(args: Array[String]): Unit = {
val ssc = StreamingContext.getOrCreate(
getCheckPointPath, () => createStreamingContext())

val err = new StreamingErrorHandler
// register the listener
ssc.addStreamingListener(err)

// start a thread to call ssc.stop() when the listener says so
monitorApplication(err)

ssc.start()
ssc.awaitTermination()
}
// create streaming context with checkpoint
def
createStreamingContext(): StreamingContext = {
val ssc = new StreamingContext(conf, Seconds(x))

// define application login on the context
myApplication(ssc)

ssc.checkpoint(checkPointPath)
ssc
}
// a thread spun off to call ssc.stop when it's time
def monitorApplication(err: StreamingErrorHandler): Unit = {
new Thread("monitor-application") {
override def run(): Unit = {
while(!err.shouldAbort) {
Thread.sleep(x * 1000)
}
logger.error("Aborting application due to error")
abort()
}
}.start()
}
// abort helper
def
abort(): Unit = {
StreamingContext
.getActive()
.fold(
// This should never happen, abort is called from a
// running streaming application which would ensure that
// streaming context is available. But probably worth setting
// up a Kibana/Stackdriver logging alert
logger.error("No active streaming context found")
)(_.stop())
}
// user application
def myApplication(ssc: StreamingContext): Unit = {
val events = inputStreamFromPubsub()
events
.foreachRDD { r =>
val future = foreachPartitionAsync { p =>
outputOperation(p.toList)
}
val error = Await.result(future, Duration.Inf)
if (error.nonEmpty) {
logger.error("oh no")
// throw from here so that it gets picked
// up by the listener
throw error
}
}

NOTE: alternatively, StreamingContext#stop() can also be called from the listener class in a separate thread instead of spinning on a volatile flag.

This also makes it possible to have an external daemon-monitor that can act as systemd for the streaming jobs (health check the jobs, have a configurable number of restarts, integration with slack/pagerduty etc.). Google dataproc supports restartable jobs, which can serve this purpose as well.

Some Dataproc Eccentricities

If you’re using Google Dataproc, here are a couple if pitfalls to avoid:

Don’t save checkpoint to GCS, save it in HDFS instead: Dataproc clusters can be spun up and down easily. In order to save money, you may want to have the cluster up only as long as the job is running. In that case, you’d want to store the checkpoint in GCS because now you may have to recover the job on another cluster. Spark supports any HDFS compatible storage for checkpointing as well.

But …

This doesn’t really scale well (even with <1000 events per second) — I am not sure whether GCS doesn’t handle it well, or if Spark WAL implementation is not optimized well enough for remote checkpoints. It doesn’t help that the fsync behavior of hadoop adaptors for GCS (and S3) is a bit obtuse. You are better off saving the checkpoints to HDFS itself, and ensure that the clusters running streaming jobs are not deleted until there is any checkpoint data to recover from.

And don’t put the checkpoint in hdfs://tmp/: duh! There seems to be some automated cleanup from time to time (of course!) that will crash the streaming job.

Production readiness checklist #2: Performance

There are two dimensions for improving performance: Latency and Throughput. There is a trade-off between the two, so you typically pick which is more crucial to your application, tune it for that one first and then try and get the other as good as possible.

Latency

Spark Streaming is not suited for sub-second latency (for that you’re better off using a framework that processes every event rather than small batches of events). The expectation should be to have a few seconds of latency.

I use 1 second batch interval across many of the jobs. The parallelism techniques mentioned above scales horizontally — add more executors and run more batches in parallel. A value of 4 concurrent jobs serves my purpose well.

Throughput

Since the output operations scales horizontally with cluster size, the bottleneck shifts to the receiver, since there is only so much a single threaded receiver can pull.

The receiver design allows to spin up multiple receives and union the streams from all of them for downstream processing. A few helper methods like these can help in creating multiple receivers for a subscription or multiple receivers for multiple subscriptions and union them all together as a single input source:

def createStream(
ssc: StreamingContext,
sub: CloudPubSubSubscription,
config: CloudPubSubConfig
): DStream[CloudPubSubMessage] = {
new CloudPubSubInputDStream(ssc, sub, config)
}

def createStream(
ssc: StreamingContext,
sub: CloudPubSubSubscription,
config: CloudPubSubConfig,
numReceivers: Int
): DStream[CloudPubSubMessage] = {
(1 to numReceivers map {i => createStream(ssc, sub, config)})
.reduce(_ union _)
}

def createStream(
ssc: StreamingContext,
subs: Seq[CloudPubSubSubscription],
config: CloudPubSubConfig,
numReceivers: Int = 1
): DStream[CloudPubSubMessage] = {
subs.map { sub => createStream(ssc, sub, config, numReceivers) }
.reduce(_ union _)
}

Databricks Wishlist

The dramatically accentuated “But …” lines are my wishlist items from Databricks:

  • Provide a mechanism to have rate limiting and backpressure for custom receivers
  • Make the block generator registering function a public API
  • Label StreamingListener as Production (from Developer API)
  • Publish benchmark numbers and optimal config options to use S3 or GCS as checkpoint stores

Although I doubt any of these will be worked upon. The DStream streaming API is already 2+ years old yet a few things are not Production-class; they’ve already moved on to Structured Streaming now, but that is still Alpha, making users wary of using it their production systems :( </rant>

Summary

Stability is the most important aspect for a Spark streaming job to be labelled production ready, and it can be achieved by a) ensuring that streaming batches are processed faster than they arrive and b) handling failures well. To speed up processing we can increase the job and task level parallelism, ensure that all executors are utilized and add rate limits so that the job operates within its calibrated bounds. Failures should be handled by having adequate number of retries and taking a deterministic fail-app/drop-batch step when all retries fail.

Receiver throughput can be bumped up by spinning multiple receivers and passing a union of all input streams downstream. Processing throughput can be horizontally scaled with cluster size with more concurrent jobs. Latency can be as low as the streaming interval can be reduced to without compromising the stability aspect.

A follow up post covers the next item in production readiness checklist: monitoring.

--

--