Flush support for GCS and S3 Hadoop Adaptors

Venkatesh Iyer
Split Brain
Published in
2 min readDec 15, 2016

--

When does the Hadoop FS adapter for GCS and S3 actually flush data to the service

While setting up a fault tolerant Spark Streaming job, I came across this setting: spark.streaming.driver.writeAheadLog.closeFileAfterWrite (for driver, and a similar one for receiver).

The doc says:

Whether to close the file after writing a write ahead log record on the driver. Set this to ‘true’ when you want to use S3 (or any file system that does not support flushing) for the metadata WAL on the driver.

This made me wonder: the basic tenet of both S3 and GCS is durability (in fact GCS has much stronger consistency guarantees), then what do the Spark authors mean by “does not support flushing”.

Did a bit of digging, and here’s the result: they are actually referring to the Hadoop Filesystem implementations for S3 and GCS.

GCS: GoogleHadoopSyncableOutputStream

* On the first call to hsync()/sync(), the destination file is committed and a new temporary file * using a hidden-file prefix (underscore) is created with an additional suffix which differs for * each subsequent temporary file in the series; during this time readers can read the data * committed to the destination file, but not the bytes written to the temporary file since the * last hsync() call. * <p> * On each subsequent hsync()/sync() call, the temporary file closed(), composed onto the * destination file, then deleted, and a new temporary file is opened under a new filename for * further writes. *

This is fairly neat — this essentially says GCS supports flush (hsync), and if we are putting a Spark Stream WAL on GCS we need not enable closeFileAfterWrite.

S3: S3OutputStream

The S3 lib on the other hand, implements a flush call, but it does not necessarily upload data to S3 server. The upload happens within the endBlock() call, which is called only conditionally on flush. And hence the recommendation to call close() which would for sure upload any un-synced bytes to S3.

FWIW I ended up not using GCS for WAL because the writes to GCS couldn’t keep up with the Spark Streaming batches. I ended up putting the WAL on the local HDFS of the Dataproc cluster

--

--