Reference: FileAlreadyExistsException in Spark jobs



A Qubole Cluster Node runs one or more Spark Executors. Each Spark executor runs in its own Java Virtual Machine (JVM). Each Spark Executor can run multiple Spark tasks.




To manage these resources, there is Apache YARN ( Yet Another Resource Negotiator ). These resources, storage, memory, cpu, etc, like anything else in this physical world we live in, are fixed. If Spark Executor needs more than it currently has, it's got to come from somewhere. Enter the concept of preemption. Webster's dictionary has several definitions, one of which is "to seize upon to the exclusion of others". Bingo! The side effects include out of memory errors and file already exists errors. We will discuss these here.

By default the in QDS sets the following:


yarn.scheduler.fair.preemption = false


Setting or overriding this to true can lead to the seizing of resources to the exclusion of others, as Webster's tells us.

When this happens you may experience ExecutorLossFailure, you should see an error that looks like the following:


Lost executor 99 on Container container_1493123456668_0099_01_12345600
on host: was preempted 


This can have several side effects.

The first is that failure of the previous task can leave behind files that will trigger file already exist errors. Typically something like the following:


org.apache.hadoop.fs.FileAlreadyExistsException: s3://some/data/path/


Should one of your Spark executors fail, Spark will retry to start the task. Retries of the same task can find that the file was already present. By default, the number of times to restart a task gives up is 3. Hence, retries of the same task can result in file already exist errors, as shown above.

Another potential cause, though rarer in our experience, is Executor going out of memory and individual tasks of that one are scheduled on another executor which will lead to the FileAlreadyExists exception.


Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 3 times, most recent failure: Lost task 1.3 in stage 2.0 (TID 7, ip-192-168-1- 1.ec2.internal, executor 4): ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 14.3 GB of 14 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead


Potential Solutions


There are several things at play here. First and foremost when you override YARN's default preemption of false, you give YARN the permission and ability to make some decisions for you. These can result in errors that need to be taken in context together.

First turn do not override YARN's preemption enablement. Make sure this is set to false as shown below.


yarn.scheduler.fair.preemption = false


Second, take a look at the size of the nodes in the clusters. You may indeed need to up them to the next tier, so that you can boost Spark executor's memory overhead.




Many thanks to Sandeep Badam for his help and guidance.



Have more questions? Submit a request


Powered by Zendesk