There are multiple use cases, Where we need to access Kudu from spark to store and retrieve data, In this article, We will see the step-by-step procedure to access kudu from spark
Apache Kudu is a columnar storage engine that gives you fast analytics on data. To access the Kudu table from Spark, We need to add the kudu-spark dependency package, and using that Spark can able to access Kudu storage through the data source API
Below are the steps that need to be followed to access kudu from spark
Below are the prerequisite to be fulfilled before proceeding further with the examples
- Apache Spark 2.4 or later version (a lesser version is not supported for this integration)
- Kudu client libraries
- Kudu master address (Which means you need to have a kudu service up and running in your cluster)
Kudu client libraries for spark can be downloaded from the Official Apache kudu website or you can install the kudu client libraries using package managers like yum.
If you are using Cloudera distribution, the Kudu-spark integration jar will be available on the parcel itself. Make sure the kudu client libraries are available in all the nodes of the spark cluster
Accessing the Kudu table from Spark
Step1: Create a Kudu table
I am using Impala to create a kudu table, it is simple comparatively for a quick creation
#Using Impala-shell to create this table
CREATE TABLE kudu_spark ( id INT, name STRING, PRIMARY KEY(id) ) PARTITION BY HASH PARTITIONS 16 STORED AS KUDU; #Inserting data into kudu table insert into table kudu_spark values(1,"test1"); insert into table kudu_spark values(2,"test2"); insert into table kudu_spark values(3,"test5"); insert into table kudu_spark values(4,"test5"); insert into table kudu_spark values(5,"test5");
Step2: Initialize spark session using kudu-spark_<version> artifact
In this example, I am using Cloudera distribution, So I am directly pointing the integration jar from the Cloudera parcel directory as below
spark-shell --jars /opt/cloudera/parcels/CDH/jars/kudu-spark2_2.11-<version>.jar
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/03/01 05:27:20 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist 23/03/01 05:27:28 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered! Spark context Web UI available at http://<hostname>:4040 Spark context available as 'sc' (master = yarn, app id = application_1677622668603_0002). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.8 /_/ Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_232) Type in expressions to have them evaluated. Type :help for more information. scala>
Step3: Create a Spark session and configure it to use the Kudu data source
#Import all the required libraries
import org.apache.kudu.client._ import org.apache.kudu.spark.kudu.KuduContext import collection.JavaConverters._
#Configure to use the Kudu data source
val df = spark.read.options(Map("kudu.master" -> "kudumasterservername:7051", "kudu.table" -> "default.kudu_spark")).format("kudu").load
df: org.apache.spark.sql.DataFrame = [id: int, name: string]
In the above step, We have created a dataframe, By adding all the necessary configurations in the data source like “Kudu master address”
Points to be noted: If you are seeing “Unauthorized action” while creating the dataframe as above, Make sure the user running the spark application has access to query the kudu table. If you are using Ranger, Provide the user access with the kudu table under cm_kudu policy
org.apache.kudu.client.NonRecoverableException: Unauthorized action at org.apache.kudu.client.KuduException.transformException(KuduException.java:110) at org.apache.kudu.client.KuduClient.joinAndHandleException(KuduClient.java:470) at org.apache.kudu.client.KuduClient.openTable(KuduClient.java:288)
NOTE: Still, We won’t be able to access the kudu table from spark sql, To make it fully accessible from spark, We need to create a view on top of the dataframe (It’s a limitation ) to handle it as a table, We will see the steps with examples
Step4: Read data from a Kudu table using the spark.read.format() method
// Create a view from the DataFrame to make it accessible from Spark SQL.
// Now we can run Spark SQL queries against our view of the Kudu table.
spark.sql("select * from my_table").show()
scala> spark.sql("select * from my_table").show() 23/03/01 05:37:44 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist Hive Session ID = 8eb4606a-d696-488d-93f4-56d3f2e35080 +---+-----+ | id| name| +---+-----+ | 4|test5| | 1|test1| | 5|test5| | 2|test2| | 3|test5| +---+-----+ scala>
In the above example, We have used spark.read.format() method to read data from a kudu table. The table has been specified in the files “kudu.table” -> kudu-spark(tablename)
Step 5: Write data from a Kudu table using the spark.write.format() method
We can create an empty kudu table to validate if the spark is writing the data properly
CREATE TABLE kudu_spark_write ( id INT, name STRING, PRIMARY KEY(id) ) PARTITION BY HASH PARTITIONS 16 STORED AS KUDU;
val df = spark.read.options(Map("kudu.master" -> "kudumasterservername:7051", "kudu.table" -> “default.kudu_spark")).format("kudu").load df.write.format("kudu").option("kudu.table", "default.kudu_spark_write").mode("append").save()
Now, We can query the new table from impala and can able to see the data
|select * from kudu_spark_write|
In this above example, we use the spark.write.format() method to write data to a Kudu table. We specify the Kudu table name using the kudu.table property. We also used the write mode() method as append, Where we can able to append new data to the same table.
To know more about the table structure, We can simply run the “describe” command to know more about the number of columns and its data type
spark.sql("describe my_table1").show() +--------+---------+-------+ |col_name|data_type|comment| +--------+---------+-------+ | id| int| null| | name| string| null| +--------+---------+-------+ scala>
pyspark code for the above example
We have discussed all the examples in scala (spark-shell) code, below is the same equivalent code for python (pyspark)
pyspark --jars /opt/cloudera/parcels/CDH/jars/kudu-spark2_2.11-<version>.jar 23/03/01 06:25:24 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered! Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.4.8.<version> /_/ Using Python version 2.7.5 (default, Oct 14 2020 14:45:30) SparkSession available as 'spark'. >>> df = spark.read.format('org.apache.kudu.spark.kudu').option('kudu.master',"<kudumasterhostname>:7051").option('kudu.table',"default.kudu_spark").load() >>> df.show() +---+-----+ | id| name| +---+-----+ | 4|test5| | 1|test1| | 5|test5| | 2|test2| | 3|test5| +---+-----+ >>>
In this article, We have explained how to access the Kudu table from Spark. We covered both reading and writing using the spark.read.format() method spark.write.format() method with very simple examples for better understanding. If you want to learn more about Kudu and Spark, check out the official Apache Kudu website and the Apache Spark website for additional resources.
Good Luck with your Learning !!