How to run Spark job with Ozone Filesystem

Apache Spark is a popular distributed computing framework for big data processing and Ozone is a distributed object store that runs on top of Hadoop Distributed File System (HDFS) and provides an S3-like API, using that we can read and write data to ozone filesystem from spark framework

How to run Spark job with Ozone Filesystem

Checkout the article, I have written to compare both Ozone and HDFS file system performance and its use case 

Prerequisites

Before we begin, ensure you have the following:

  • Apache Hadoop Ozone installed and configured.
  • Spark libraries installed and configured.

In this article, We will see a step-by-step procedure to run a spark job with the Ozone filesystem (Cloudera distribution), Where all the data files will be available in the Ozone filesystem. Before proceeding with the spark job, We will see, How to upload data to the Ozone filesystem

Set up Ozone filesystem

Let’s create a volume and bucket in the Ozone filesystem these are called storage elements

1. Create a volume 

]$ ozone sh volume create /volume
23/03/02 09:33:40 INFO rpc.RpcClient: Creating Volume: volume, with test_user as owner and space quota set to -1 bytes, counts quota set to -1

2. Create a bucket

$ ozone sh bucket create /volume/bucket
23/03/02 09:34:17 INFO rpc.RpcClient: Creating Bucket: volume/bucket, with the Bucket Layout null, test_user as owner, Versioning false, Storage Type set to DISK and Encryption set to false 

3. Add the properties fs.o3fs.impl to core-site.xml.

<property>  
<name>fs.o3fs.impl</name>
<value>org.apache.hadoop.fs.ozone.OzoneFileSystem</value>
</property>

If you are using Cloudera distribution, You can add the property under HDFS Advance configuration core-site.xml

4. Run a Wordcount job with Ozone filesystem using pyspark

– Create a text file with words

vi /tmp/wordcount.txt
“Learn Share is a platform to share knowledge in which others can able to learn from your mistake and experience”

– Upload file to the Ozone filesystem

$ ozone sh key put /volume/bucket/wordcount.txt  /tmp/wordcount.txt
23/03/02 10:46:59 WARN impl.MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-xceiverclientmetrics.properties,hadoop-metrics2.properties

23/03/02 10:46:59 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).

23/03/02 10:46:59 INFO impl.MetricsSystemImpl: XceiverClientMetrics metrics system started

23/03/02 10:47:00 INFO metrics.MetricRegistries: Loaded MetricRegistries class org.apache.ratis.metrics.impl.MetricRegistriesImpl

– Access the file from the Ozone file system using hdfs cli command

hdfs dfs -ls o3fs://bucket.volume.<service id>/

We need to edit the bucket, volume name, and Ozone service id  based on your cluster 

NOTE: Check your Ozone configuration for service id “ozone.service.id”. In my cluster it is ozone

hdfs dfs -ls o3fs://bucket.volume.ozone/
Found 2 items
-rw-rw-rw-   3 learn learn         46 2023-03-02 09:36 o3fs://bucket.volume.ozone/data.csv
-rw-rw-rw-   3 learn learn        112 2023-03-02 10:47 o3fs://bucket.volume.ozone/wordcount.txt

– Create a pyspark session

$ pyspark
Python 2.7.5 (default, Oct 14 2020, 14:45:30) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/03/02 10:28:54 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.8
      /_/
Using Python version 2.7.5 (default, Oct 14 2020 14:45:30)
SparkSession available as 'spark'.
>>>

– Run the Wordcount program by pointing the location to the Ozone filesystem

>>> data=sc.textFile("o3fs://bucket.volume.ozone/wordcount.txt")
>>> wordcount=data.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)
>>> output=wordcount.collect()
>>> for (word, count) in output:                                                
    print("%s: %i" % (word, count))

Output:

a: 1
and: 1
from: 1
able: 1
is: 1
share: 1
experience: 1
Share: 1
which: 1
Learn: 1
knowledge: 1
platform: 1
learn: 1
to: 2
can: 1
in: 1
your: 1
others: 1
mistake: 1

>>> 

In the above example, We have a text file in the Ozone filesystem, and “data” is an RDD and we are using “map”, “flatmap”, “reducebykey” for transformations

Finally, We are initiating an action to collect the final result and print in a human-readable format

– Write to Ozone filesystem using pyspark

From the above example, Instead of printing it in the console, We can write the output back to Ozone filesystem as below

>>> data=sc.textFile("o3fs://bucket.volume.ozone/wordcount.txt")
>>> wordcount=data.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)
>>> wordcount.saveAsTextFile("o3fs://bucket.volume.ozone1/output")

$ hdfs dfs -ls o3fs://bucket.volume.ozone/output/
Found 3 items
-rw-rw-rw-   3 learn learn          0 2023-03-02 10:55 o3fs://bucket.volume.ozone1/output/_SUCCESS
-rw-rw-rw-   3 learn learn        120 2023-03-02 10:55 o3fs://bucket.volume.ozone1/output/part-00000
-rw-rw-rw-   3 learn learn        141 2023-03-02 10:55 o3fs://bucket.volume.ozone1/output/part-00001

$ hdfs dfs -cat o3fs://bucket.volume.ozone/output/part-00000
23/03/02 10:59:31 WARN impl.MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-xceiverclientmetrics.properties,hadoop-metrics2.properties
23/03/02 10:59:31 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
23/03/02 10:59:31 INFO impl.MetricsSystemImpl: XceiverClientMetrics metrics system started
(u'a', 1)
(u'and', 1)
(u'from', 1)
(u'able', 1)
(u'is', 1)
(u'share', 1)
(u'experience', 1)
(u'Share', 1)
(u'which', 1)

In the above example, We have found the wordcount from the given textfile and written the output back to the Ozone filesystem using saveAsTextFile method, Which writes the RDD to the Ozone path

Conclusion

In this article, we discussed how to run a Spark job with an Ozone filesystem. We first set up an Ozone cluster, configured Spark with Ozone, created a Spark session with Ozone configurations, and finally read and wrote data with Ozone using Spark code. With this knowledge, you can now leverage the power of Spark and Ozone to process big data efficiently.

For more information about the various Ozone command-line tools and the Ozone shell, check here

Good Luck with your Learning !!

Similar Posts