I hope you have encountered a similar situation, Where you wanted to do some manipulation on a spark dataframe and save it directly to the hive table. If yes, Then this article is to help you understand the basics of dataframe and how to access and save dataframe to the hive table.
Save Spark DataFrame directly to Hive table involves using the DataFrame’s
write method with the
saveAsTable function. By specifying the desired database and table name, and selecting the ‘overwrite’ mode, the DataFrame can be seamlessly persisted into the Hive table
Before proceeding further, Let’s understand dataframe in Spark and its use
DataFrames in Spark
DataFrames are a distributed collection of data organized into named columns. They provide a higher-level API compared to RDDs (Resilient Distributed Datasets) and allow for better optimization, performance, and integration with various data sources. DataFrames can be easily compared to the tables in RDBMS (relational databases), Which make it an ideal choice for data manipulation and analysis tasks.
Spark and Hive Integration
To proceed further, Ensure that you have a cluster with spark and Hive service set up properly. In this example, I am using Cloudera distribution, But it is similar for all platforms like Apache Hadoop or Standalone spark
Hive service should have Hiverserver2 and Hive metastore server up and running. By default, the integration will be available in the Cloudera distribution, If you are using Apache Hadoop, make sure to add hive-site.xml, hdfs-site.xml, and core-site.xml under spark configuration. Check here for more details
Save Spark DataFrame directly to Hive
To save a DataFrame to a Hive table using Spark, you can use the
saveAsTable method. Let’s see how it is done
Open a Pyspark session
# pyspark Python 2.7.5 (default, Jun 28 2022, 15:30:04) [GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux2 Type "help", "copyright", "credits" or "license" for more information. Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/05/31 06:44:10 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered! Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.4.0 /_/ Using Python version 2.7.5 (default, Jun 28 2022 15:30:04) SparkSession available as 'spark'. >>>
#Using the above SparkSession -> 'spark'. Let's creaet a dataframe
# Create a DataFrame df = spark.createDataFrame([(1, "Learn"), (2, "Share"), (3, "everything")], ["id", "name"]) # Save the DataFrame as a Hive tabledf.write.saveAsTable("default.mytable")
In the example above, we create a DataFrame
df with two columns: “id” and “name”. We then use the
write method of the DataFrame to save it as a Hive table. Make sure to pass all the arguments to saveAsTable function. In this example, I have database=default and tablename= mytable. This will create a table:mytable under the default database
NOTE: Assuming you have appropriate permissions to create a table in Hive. Also, ensure that the Hive metastore is properly configured and accessible by Spark.
You can use Beeline to connect to the hive and check if the table is created successfully
# beeline WARNING: Use "yarn jar" to launch YARN applications. SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH beeline> select * from default.mytable; OK 1 Learn 2 Share 3 everything Time taken: 0.611 seconds, Fetched: 3 row(s) beeline>
Appending Data to an Existing Hive Table:
In some scenarios, you may need to append new data to an existing Hive table. Spark provides multiple approaches to achieve this. Here are two common methods:
Let’s use the same table created in the above example and try to append records using
Create a pyspark session
# Assuming you have a SparkSession object named 'spark' # Create a DataFrame with new data df = spark.createDataFrame([(4, "Google"), (5, "likes")], ["id", "us"]) # Append the new data to an existing Hive table df.write.insertInto("default.mytable")
IN the above example, We have used
insertInto function to append a new record to the existing table
using Beeline to connect to the hive and check if the new records were added successfully. Yes it did add
OK 4 Google 1 Learn 5 likes 6 us 2 Share 3 everything Time taken: 1.594 seconds, Fetched: 6 row(s)
Using SQL insert statement:
Another method to append new records to the existing table
# Assuming you have a SparkSession object named 'spark' # Create a DataFrame with new data df = spark.createDataFrame([(7, "everyone"), (8, "likes us")], ["id", "name"]) # Register the new DataFrame as a temporary table df.createOrReplaceTempView("tempview") # Append the new data to an existing Hive table using SQL spark.sql("INSERT INTO TABLE default.mytable SELECT * FROM tempview")
Both methods allow you to append data to an existing Hive table. Choose the method that suits your requirements and aligns with your preferred coding style.
select * from default.mytable ORDER BY id; 1 Learn 2 Share 3 everything 4 Google 5 likes 6 us 7 likes us 8 everyone
Persisting data for analysis is a critical aspect of any data-driven organization. Spark’s integration with Hive provides a powerful solution for data warehousing and analytics. In this article, we explored how to save DataFrames to Hive tables using Spark. We have covered the basics of DataFrames, the setup process for Spark and Hive, and demonstrated how to save a DataFrame as a Hive table. Additionally, we discussed methods for appending data to an existing Hive table.
Hope you have understood all the aspects discussed above and do comment if you have any questions
So, dive into Spark, harness the power of Hive, and unlock the true potential of your data-driven initiatives!
Good Luck with your Learning!!