Handling S3 Eventual Consistency with Spark, Deep Dive

Problem: 

Most of the time s3 eventual consistency bites us in listing API. Typically when we do a listing of a directory in s3 in which we have recently deleted or added some files, the listing API can either show us “ghost” files: files which have been deleted but still present in the listing or miss “conceived” files: files which have been added but were not listed by the listing API.

Ghost Files

Ghost files cause problems because when doing any operations with such a file, we are likely to fail with FileNotFound exception.

Conceived Files

Conceived files usually don’t cause any problem in the Spark job level. They cause problems because of missing data in the final results.

Solutions

At Qubole we have addressed these issues at some of the problematic areas. Here is how you can identify which solution would be appropriate for your use case.

FileOutputCommitter

Qubole Spark uses DirectFileOutputCommitter by default. This helps in avoiding listing API which is used in the jobCommit phase of the usual FileOutputCommitter. By writing directly to the final location DirectFileOutputCommitter avoids the jobCommit phase all together. But when doing insert into/append operations on spark tables, spark forces the use of FileOutputCommitter. FileOutputCommitter v1 does a copy operation in the jobCommit phase which not only takes lots of time, but is also likely to fail because of eventual consistency issues.

Qubole offers following two settings for dealing with eventual consistency issues in the FileOutputCommitter:

If using FileOutputCommitter v1, set:

--conf spark.hadoop.mapreduce.use.parallelmergepaths=true

Parallel merge paths makes the jobCommit phase run in multiple threads, making is faster and also retries various procedures if they fail, instead of giving up on first error.

Or use FileOutputCommitter v2:

--conf spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2

Writing to Hive Tables

Eventual consistency issues are also common when writing to hive tables from spark. Hive tables writes are done by first spark writing the results to a temporary location on s3, followed by rename/copy operations to move files from temporary s3 location to final table location. If writing to hive tables, please set:

--conf spark.hadoop.hive.qubole.consistent.loadpartition=true

This setting improves the speed of file copy operations by parallelising it using threadpool and also handle common failure scenarios by automatically ignoring or retrying operations as appropriate.

--conf spark.qubole.sql.hive.useDirectWrites=true
(Available from Jan 2018)

This setting does away with writing to temp location and copying files, instead it directly writes files to final table location. It will not only improve speed of writing to hive tables from spark, but by avoiding writes and s3 listing, will avoid running into eventual consistency issues.

Reading from recently written tables

Many times we have seen users first write to a location and then immediately read it in the next line of scala/python code. Such tables are susceptible to eventual consistency failures. Usually this appears in the form of FileNotFound exception. If possible, avoid such code patterns. The following setting will come handy if you cannot avoid the pattern:

--conf spark.sql.qubole.ignoreFNFExceptions=true

As you can guess we are simply ignoring the “ghost” files here, which were listed by the s3 listing API. Another reason for this issue could be concurrent delete by some other job. 

Another advanced API for generally ignoring all kind of errors during read operations is:

--conf spark.sql.files.ignoreCorruptFiles=true

From spark documentation:If  spark.sql.files.ignoreCorruptFiles is true, the Spark jobs will continue to run when encountering corrupted or non-existing and contents that have been read will still be returned

DFOC & speculation

By default Qubole disables speculation. DirectFileOutputCommitter (Qubole default) doesn’t work well with speculation. If you have a need to enable speculation, please make sure to disable DirectFileOutputCommitter.

--conf spark.hadoop.mapred.output.committer.class=org.apache.hadoop.mapred.FileOutputCommitter

--conf spark.hadoop.mapreduce.use.directfileoutputcommitter=false

--conf spark.hadoop.spark.sql.parquet.output.committer.class=org.apache.parquet.hadoop.ParquetOutputCommitter

 

Reference links:

 

Have more questions? Submit a request

Comments

Powered by Zendesk