Resolve “Job aborted due to stage failure” in Spark

When it comes to troubleshooting Spark issues. One thing you get used to it is knowing what the error exactly means. In this article, We will learn how to troubleshoot and resolve the “Job aborted due to stage failure” ERROR in Spark.

“org.apache.spark.SparkException: Job aborted due to stage failure” indicates that a task has failed after exceeding the default retry limit(4 by default), resulting in the corresponding stage’s failure.

Before proceeding further on resolving this issue, Let’s understand the basic terminology used in spark

Understanding Spark terminology

A Spark application is divided into jobs. Each job is divided into stages and further stages are divided into tasks. Let’s see what it means

org.apache.spark.SparkException: Job aborted due to stage failure

Job

It refers to a complete computational task composed of multiple stages. It represents a unit of work that needs to be executed within a Spark application.

Stage

A stage is a logical division of a job, typically based on data dependencies and the “Need for Shuffle”. Each stage consists of tasks that can be executed in parallel, with each task operating on a data partition.

NOTE: If a stage fails for some reason then the entire spark application is marked as a failure

Task

A Task is the basic unit of work in Spark. It is an individual computation that can be performed on a single partition of data. Tasks are the building blocks of stages and are executed concurrently on different executor nodes within a cluster.

NOTE: If a task failed 4 times then the stage hosting the task will be marked as a failure

Why we are seeing “org.apache.spark.SparkException: Job aborted due to stage failure”

As we have understood the terminology used in the spark, Spark job -> Divided as jobs -> Jobs divided as stages -> stages divided as tasks

When a task encounters issues and fails, Spark attempts to re-execute it on a different executor. If the task fails after four retries, the associated stages will be labeled as failed. As a consequence, the overall Spark job concludes in failure.

As a result, you will be seeing the below error message in the Spark application logs

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 3.0 failed 4 times, most recent failure

Therefore it is important to understand, What is the underlying issue that is causing the task to fail repeatedly. We will learn about a few causes that I have encountered in the past

How to replicate the issue

Replicating the ERROR message “org.apache.spark.SparkException: Job aborted due to stage failure” is pretty simple 🙂

We just want to make tasks fail multiple numbers of times, Either by putting memory pressure on the task/executor or by writing the wrong code to make the task fails continuously.

Step 1: Create a 10GB file

Created a 10 GB file using the below command

 fallocate -l 10G 10Gigfile

Step 2: Trigger a Pyspark session with 600MB executor memory

pyspark --executor-memory 600M

Step3: Run a wordcount program on top of 10GB of data

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.8
      /_/

Using Python version 2.7.5 (default, Jun 30 2022 15:30:04)
SparkSession available as 'spark'.
>>> lines=spark.sparkContext.textFile("/tmp/10Gigfile")
>>> wordcount=lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x,1)).reduceByKey(lambda x,y:x+y).map(lambda x: (x[1], x[0])).sortByKey(False).collect()

ERROR message:

Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 42, hostname, executor 16): java.io.IOException: Too many bytes before newline: 2142483648

I was expecting an OutofMemory issue, But the file we created using “fallocate” has a lot of bytes in a single line, Which spark is unable to consume and throws the “Too many bytes before newline” exception. But it helped us to replicate the issue by failing all other task attempts and once it reaches the limit the complete spark job is marked as a failure.

To resolve the issue, We need to make sure to break the content to have lesser words/bytes in each line

Resolution:

As mentioned in the above section, “Job aborted due to stage failure” is actually a side affects of the actual task failure. We need to find the exact issue and resolve it, I have shared a few scenarios I have encountered in the past.

Scenario 1

In this scenario, We could see the spark application failed with “Job aborted due to stage failure”, But if we check further, We can see the actual issue (Serialized task 234:0 was 248294358 bytes, which exceeds max allowed: spark.rpc.message.maxSize) which caused the task to fail for more than 4 times

org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 234:0 was 2482232358 bytes, which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1928)

Resolution

To resolve the issue, We can either increase the property to a higher value”--conf spark.rpc.message.maxSize=1024” either at the job level or at the cluster level

Job Level

spark-submit --master yarn --deploy-mode cluster --conf spark.rpc.message.maxSize=1024

Cluster Level

Add “spark.rpc.message.maxSize=1024” in spark-default.conf in all Hadoop cluster nodes

Scenario 2

In this scenario, the Spark job failed with “Job aborted due to stage failure”. But the actual underlying issue of the task failure is due to this error “Container killed on request. Exit code is 143”.

“exit code 143” will happen if the container got SIGTERM signal from Yarn or by external means (manually using kill -9 <pid>). The most common cause we usually face is the YARN kills the container if it uses more memory than allocated memory.

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 35 in stage 2.0 failed 4 times, most recent failure: Lost task 35.3 in stage 2.0 (TID 1175, hostname, executor 69): ExecutorLostFailure (executor 69 exited caused by one of the running tasks) Reason: Container from a bad node: container_e13_2323242342_213245_01_000090 on host: hostname. Exit status: 143. Diagnostics: []Container killed on request. Exit code is 143

Resolution

To resolve the issue, We need to make sure why the container got the SIGTERM signal and who triggered the signal. If it is YARN, We can be able to see relevant logs in the Yarn application logs

Example:

ExecutorLostFailure Reason: YARN Killed the running container

In the above scenario, We could confirm that YARN provided the SIGTERM signal to this PID causing this 143 exit code.

Mostly, YARN kills a container in the middle of execution, Only if the container is requesting more memory than allocated memory. To resolve this, We can try tunning the memory property or optimizing the code

For Spark:

--executor-memory 12G

MR Job:

SET mapreduce.map.memory.mb=8192; 
SET mapreduce.map.java.opts=-Xmx7680M;
SET mapreduce.reduce.memory.mb=5120; 
SET mapreduce.reduce.java.opts=-Xmx4096M;

Tez:

SET hive.tez.container.size =8192;

Created a complete article for the above “exit code 143” ERROR, Check it out if you need more information on this

Scenario 3

Again, In this scenario, the Actual cause for the job failure is because of “ExecutorLostFailure”, This happens, When the Driver is unable to get a heartbeat from Executor it will mark the executor as lost and all the tasks running on that executor will be marked as a failure

“Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1.0 (TID 188, hostname, executor 16): ExecutorLostFailure (executor 16 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 174247 ms

Resolution

This issue usually happens if there is any network issue, It would be better to tune the timeout based on the network performance or our use case

set spark.executor.heartbeatInterval=30s;
set spark.network.timeout=360s;

Scenario 4

In the scenario, Tasks failed while fetching results from other executors “Failed to connect to hostname:7337 at “. When the number of task failures reaches the limit (4 times) it will mark the spark job as a failure

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 381 (csv at NativeMethodAccessorImpl.java:0) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Failed to connect to hostname:7337 	at 

Resolution

For this, we need to check If there is any communication issue between the executor and the shuffle service and we tune the below properties

increase spark.network.timeout=300s and spark.shuffle.registration.timeout=30000

Conclusion

In Summary, ERROR “org.apache.spark.SparkException: Job aborted due to stage failure” is actually a side affects of the actual task failure, Make sure to validate the failed task for the exact reason and fix the issue. We have covered a working example to replicate and fix the issue.

Good Luck with your Learning !! Do share your feedback in the comment section 🙂

Related Article:

Resolve the “container exited with a non-zero exit code 143” issue in Spark, Hive, and Tez

Resolve the “java.lang.OutOfMemoryError: Java heap space” issue in Spark and Hive(Tez and MR)

How to Enable Debug Mode in spark-submit, spark-shell, and pyspark

spark.driver.memoryOverhead and spark.executor.memoryOverhead explained

How to Enable Kerberos Debugging for Spark Application and “hdfs dfs” Command

Similar Posts