“Futures timed out” issue in spark

“Futures timed out” is a common error that can occur when running Spark applications. In this article, We will learn, Why the spark application fails with the ERROR and ways to fix it

“Futures timed out” error is caused when the Spark ApplicationMaster or Executor fails to get updates within the configured timeout. More specifically, If the Spark context didn’t get initiated within “spark.yarn.am.waitTime” or if the Executor didn’t send a heartbeat to the Driver within the “spark.executor.heartbeatInterval” will get “Futures timed out” error.

"Futures timed out" issue in spark

Quick introduction about Application Master and Executor

Application Master

Application Master acts as a mini resource manager inside the Spark application. It uses a yarn-allocator to manage the YARN container for executors. Whenever the driver requests an executor, the Application master will negotiate with the resource manager (YARN) to host the executor

Executor

Executors are the worker container, Which helps to run the actual task, These executors are launched by the application master based on the request from the Spark driver

Scenario 1: Futures timed out in Application Master

When the Spark application is running in cluster mode, the Application Master will start the user application code as a separate thread, 

By default “spark.yarn.am.waitTime” is set to 100 Seconds, If the spark initialization takes more than the “spark.yarn.am.waitTime” timeout then it will fail with the “Futures timed out” error

In this scenario, You can able to see the below error message in the application container logs, Where you can see “yarn.ApplicationMaster” is throwing this error, Which helps us to understand the issue is on the application master side

22/03/07 18:27:54 ERROR yarn.ApplicationMaster: Uncaught exception:
java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)    
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
        at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:448)
        at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:276)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:821)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:820)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
        at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:820)
        at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)

Reason: 1

If the User Spark application code has a potentially long-running constructor or it has some user logic that needs to be completed before calling the SparkContext creation, If there is any delay in this process, We will see the timeout error

SparkContext sc = new SparkContext(sparkConf);

then the above “java.util.concurrent.TimeoutException: Futures timed out” timeout error can occur.

Reason 2:

This can happen if there is any network issue in the communication to the application master can cause the “Futures timed out

Resolution:

To resolve this issue, You can review your Spark application code and improve it with the following:

  1. Optimize the runtime of the initialization code

2. initialize SparkContext as early as possible

SparkContext sc = new SparkContext(sparkConf);

3. Increase the wait time by adding the following configuration to your spark-submit command:

--conf spark.yarn.am.waitTime=300s

Scenario 2: Futures timed out during Broadcast join 

“Futures timed out” error can happen when Spark tries to do Broadcast Hash Join and some of the DataFrames is huge and it took more time to process than the timeout value

In this stack trace, We can able to understand the issue is due to the broadcast join “org.apache.spark.sql.execution.joins.BroadcastHashJoin”

scheduler.TaskSetManager: Finished task 154.0 in stage 4.0 (TID 839) in 220 ms on <Hostname> (399/400) java.util.concurrent.TimeoutException: Futures timed out after [300 seconds] 
at 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
at 
scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) 
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala: at scala.concurrent.Await$.result(package.scala:107) 
at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)

Resolution:

To resolve the issue, We can

  1. Set the spark.sql.broadcastTimeout to a higher value
--conf spark.sql.broadcastTimeout=<value>

2. We can persist() both DataFrames in the code So that Spark will use Shuffle Join

3. We can disable broadcasting

--conf spark.sql.autoBroadcastJoinThreshold=-1

Scenario 3: Futures timed out in Executor

In this scenario, We could see the executor unable to communicate with the Driver for 10 sec, which is the default “spark.executor.heartbeatInterval” causing the “Futures timed out” error

In this stack trace, We can confirm the issue is related to the Executor “executor.Executor”

21/10/21 02:22:53 WARN executor.Executor: Issue communicating with driver in heartbeater
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:844)
at org.apache.spark.executor.Executor$$anonfun$1.apply$mcV$sp(Executor.scala:174)
at org.apache.spark.executor.Executor$$anonfun$1.apply(Executor.scala:174)
at org.apache.spark.executor.Executor$$anonfun$1.apply(Executor.scala:174)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1993)
at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:51)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)

Reason:

It can happen, if there is any network or host issue resulting in the Driver or executor being hung state and unable to provide the heartbeat to the driver

Resolution:

We need to increase the below property in the spark to resolve the issue

--conf spark.executor.heartbeatInterval=180s
--conf spark.network.timeout=300s

Conclusion

In Summary, We have discussed various scenarios, where we will be seeing the “Futures timed out” error like application master, executor, and the way to resolve the issue

Good Luck with your Learning !!

Similar Posts