How to Save Spark DataFrame directly to Hive

I hope you have encountered a similar situation, Where you wanted to do some manipulation on a spark dataframe and save it directly to the hive table. If yes, Then this article is to help you understand the basics of dataframe and how to access and save dataframe to the hive table.

Save Spark DataFrame directly to Hive table involves using the DataFrame’s write method with the saveAsTable function. By specifying the desired database and table name, and selecting the ‘overwrite’ mode, the DataFrame can be seamlessly persisted into the Hive table

df.write.mode('overwrite').saveAsTable('database.table')
Save Spark DataFrame directly to Hive

Before proceeding further, Let’s understand dataframe in Spark and its use

DataFrames in Spark

DataFrames are a distributed collection of data organized into named columns. They provide a higher-level API compared to RDDs (Resilient Distributed Datasets) and allow for better optimization, performance, and integration with various data sources. DataFrames can be easily compared to the tables in RDBMS (relational databases), Which make it an ideal choice for data manipulation and analysis tasks.

Spark and Hive Integration

To proceed further, Ensure that you have a cluster with spark and Hive service set up properly. In this example, I am using Cloudera distribution, But it is similar for all platforms like Apache Hadoop or Standalone spark

Hive service should have Hiverserver2 and Hive metastore server up and running. By default, the integration will be available in the Cloudera distribution, If you are using Apache Hadoop, make sure to add hive-site.xml, hdfs-site.xml, and core-site.xml under spark configuration. Check here for more details

Save Spark DataFrame directly to Hive

To save a DataFrame to a Hive table using Spark, you can use the saveAsTable method. Let’s see how it is done

Open a Pyspark session

# pyspark
Python 2.7.5 (default, Jun 28 2022, 15:30:04) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/31 06:44:10 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 2.7.5 (default, Jun 28 2022 15:30:04)
SparkSession available as 'spark'.
>>> 

#Using the above SparkSession -> 'spark'. Let's creaet a dataframe

# Create a DataFrame
df = spark.createDataFrame([(1, "Learn"), (2, "Share"), (3, "everything")], ["id", "name"])

# Save the DataFrame as a Hive table
df.write.saveAsTable("default.mytable")

In the example above, we create a DataFrame df with two columns: “id” and “name”. We then use the write method of the DataFrame to save it as a Hive table. Make sure to pass all the arguments to saveAsTable function. In this example, I have database=default and tablename= mytable. This will create a table:mytable under the default database

NOTE: Assuming you have appropriate permissions to create a table in Hive. Also, ensure that the Hive metastore is properly configured and accessible by Spark.

Validation

You can use Beeline to connect to the hive and check if the table is created successfully

# beeline
WARNING: Use "yarn jar" to launch YARN applications.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH
beeline> select * from default.mytable;
OK
1	Learn
2	Share
3	everything
Time taken: 0.611 seconds, Fetched: 3 row(s)
beeline>

Appending Data to an Existing Hive Table:

In some scenarios, you may need to append new data to an existing Hive table. Spark provides multiple approaches to achieve this. Here are two common methods:

Using insertInto method:

Let’s use the same table created in the above example and try to append records using insertInto method

Create a pyspark session

pyspark
# Assuming you have a SparkSession object named 'spark'

# Create a DataFrame with new data
df = spark.createDataFrame([(4, "Google"), (5, "likes")], ["id", "us"])

# Append the new data to an existing Hive table
df.write.insertInto("default.mytable")

IN the above example, We have used insertInto function to append a new record to the existing table default.mytable

Validation

using Beeline to connect to the hive and check if the new records were added successfully. Yes it did add

OK
4	Google
1	Learn
5	likes
6	us
2	Share
3	everything
Time taken: 1.594 seconds, Fetched: 6 row(s)

Using SQL insert statement:

Another method to append new records to the existing table

# Assuming you have a SparkSession object named 'spark'

# Create a DataFrame with new data
df = spark.createDataFrame([(7, "everyone"), (8, "likes us")], ["id", "name"])

# Register the new DataFrame as a temporary table
df.createOrReplaceTempView("tempview")

# Append the new data to an existing Hive table using SQL
spark.sql("INSERT INTO TABLE default.mytable SELECT * FROM tempview")

Both methods allow you to append data to an existing Hive table. Choose the method that suits your requirements and aligns with your preferred coding style.

Validation

select * from default.mytable ORDER BY id;

1	Learn
2	Share
3	everything
4	Google
5	likes
6	us
7	likes us
8	everyone

Conclusion

Persisting data for analysis is a critical aspect of any data-driven organization. Spark’s integration with Hive provides a powerful solution for data warehousing and analytics. In this article, we explored how to save DataFrames to Hive tables using Spark. We have covered the basics of DataFrames, the setup process for Spark and Hive, and demonstrated how to save a DataFrame as a Hive table. Additionally, we discussed methods for appending data to an existing Hive table.

Hope you have understood all the aspects discussed above and do comment if you have any questions

So, dive into Spark, harness the power of Hive, and unlock the true potential of your data-driven initiatives!

Good Luck with your Learning!!

Related Topics:

Resolve the “User application exited with status 1” issue in Spark

How to Access HBase from Spark

Resolve the “java.lang.OutOfMemoryError: Java heap space” issue in Spark and Hive(Tez and MR)

How to Run the Spark history server locally

Script to collect thread dump (Jstack)

Similar Posts