What are broadcast variables in Spark

Broadcast variables are commonly used by Spark developers to optimize their code for better performance. This article will provide a more in-depth explanation of broadcast variables and how they work, and why they are important.

Broadcast variables in Apache Spark are a powerful feature for sharing read-only data across nodes, reducing network traffic, and improving performance. Created using SparkContext.broadcast(), they are accessed in tasks using value. This feature optimizes workflows and scales Spark applications to handle large datasets more efficiently.

What are broadcast variables in Spark
broadcast

Introduction:

Spark is an open-source distributed computing system that is widely used for big data processing. Spark can able to handle large datasets very efficiently by distributing the data across multiple nodes in a cluster. Spark achieves this through its Resilient Distributed Datasets (RDDs) which are immutable distributed collections of objects.

In addition to RDDs, Spark offers another useful feature that helps to increase the performance which is called broadcast variables. Broadcast variables allow Spark to efficiently share read-only data across the nodes in a cluster.

What are broadcast variables in Spark?

Broadcast variables are read-only variables that will be cached on each worker node in a cluster. It basically helps to store data that is common to all tasks running on the worker nodes. When a Spark job is executed, Spark serializes the broadcast variable and sends it to each worker node where it is cached in memory. This allows each task running on the worker node to access the data without having to transfer it over the network repeatedly.

Broadcast variables can be created in Spark by calling the below method.

SparkContext.broadcast(<arg1>) 

The method takes a single argument which is the data that needs to be broadcasted. The data can be of any type that is serializable, such as an integer, string, or custom class.

How do broadcast variables work?

Once you created the broadcast variables, Spark serializes the data and sends it to each worker node in the cluster. The data is then cached in memory on each worker node, making it available for all tasks running on that node. Since the data is cached in memory, it can be accessed much faster than if it had to be transferred over the network repeatedly (Which is costlier).

When a task running on a worker node needs to access the broadcast variable

  • First checks if the variable is already cached in memory.
  • If the broadcast variable is not cached, the task sends a request to the driver node to retrieve the variable.
  • The driver process then sends the serialized variable back to the worker node, Then it will be cached in the memory and makes it available to all tasks running on that node.

Why are broadcast variables important?

Broadcast variables are an important feature in Spark because they can significantly improve the performance of Spark jobs. Without broadcast variables, Spark needs to transfer read-only data over the network repeatedly for each task running on each worker node (Which can be a bottleneck). This can result in a significant amount of network overhead and can slow down the performance of the job.

By caching the read-only data in memory on each worker node, It can able reduce the amount of network traffic and improve the performance of Spark jobs. This is particularly important for jobs that involve large read-only datasets.

Real-time example: Pyspark

Let’s say you have a large dataset of customer transactions, and you want to join it with a smaller lookup table that maps customer IDs to customer names.

Without using broadcast variables, the lookup table would need to be transferred over the network to each worker node every time the model is trained, even though the lookup table is the same for all tasks. This can result in a lot of network traffic and slow down the model training process.

Using broadcast variables, the lookup table can be cached in memory on each worker node, and all tasks can access it without having to transfer it over the network. This can significantly reduce the amount of network traffic and improve the performance of the model training process.

Example:

# Load customer transactions dataset
transactions = sc.textFile("hdfs://<Namenodehostname>:8020/tmp/transactions.csv")

# Load customer lookup table (small)
lookup_table = {"1": "Tom", "2": "Jane", "3": "Bob"}

# Create broadcast variable for customer lookup table
broadcast_customer_lookup = sc.broadcast(lookup_table)

# Perform join operation
joined_data = transactions.map(lambda x: (x.split(',')[0], x)) \
                          .map(lambda x: (x[0], broadcast_customer_lookup.value.get(x[0], 'Unknown'), x[1]))

# Save joined data to output file
joined_data.saveAsTextFile("hdfs://<Namenodehostname>:8020/tmp/output")


Sample Dataset for "/tmp/transactions.csv"

customer_id,transaction_amount,transaction_date
1,100.00,2022-01-01
2,50.00,2022-01-02
3,75.00,2022-01-03
1,200.00,2022-01-04
2,25.00,2022-01-05
3,125.00,2022-01-06
4,150.00,2022-01-07
1,50.00,2022-01-08
2,75.00,2022-01-09
3,100.00,2023-01-10


Output:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 2.7.5 (default, Oct 14 2020 14:45:30)
SparkSession available as 'spark'.
>>> transactions = sc.textFile("/tmp/transactions.csv")
>>> lookup_table = {"1": "Tom", "2": "Jane", "3": "Bob"}
>>> broadcast_customer_lookup=sc.broadcast(lookup_table)
>>> joined_data = transactions.map(lambda x: (x.split(',')[0], x)) \
...                           .map(lambda x: (x[0], broadcast_customer_lookup.value.get(x[0], 'Unknown'), x[1]))
>>> joined_data.collect()
[(u'1', 'Tom', u'1,100.00,2022-01-01'), (u'2', 'Jane', u'2,50.00,2022-01-02'), (u'3', 'Bob', u'3,75.00,2022-01-03'), (u'1', 'Tom', u'1,200.00,2022-01-04'), (u'2', 'Jane', u'2,25.00,2022-01-05'), (u'3', 'Bob', u'3,125.00,2022-01-06'), (u'4', 'Unknown', u'4,150.00,2022-01-07'), (u'1', 'Tom', u'1,50.00,2022-01-08'), (u'2', 'Jane', u'2,75.00,2022-01-09'), (u'3', 'Bob', u'3,100.00,2023-01-10'), (u'', 'Unknown', u'')]
>>> 

NOTE: Instead of saving it to a file, I have used “collect()” to show the results in the console

In this example, we load the customer transactions dataset and the customer lookup table from the HDFS location. We then create a broadcast variable for the customer lookup table using sc.broadcast()

Next, we perform a join operation between the two datasets using the customer ID as the join key. Since we’ve broadcast the lookup table to all nodes, we can efficiently perform the join operation without having to transfer the lookup table over the network for each task.

Finally, we save the joined data to an output file in the HDFS location or we can use collect() to show the results in the console.

Conclusion

In conclusion, broadcast variables are an important feature in Spark that allows data to be cached in memory on each worker node in a cluster. This reduces network traffic and improves the performance of Spark jobs.

Broadcast variables are easy to create in Spark (Which is explained in detail in this article) and can be used for any type of serializable data. Using broadcast variables can help programmers to create faster and more efficient big data applications.

Good Luck with your Learning !!

Similar Posts