Spark SQL - Performance Tuning tips
In this article I am sharing my understanding and experiences with Spark SQL since we just completed migrating 300+ tables in my domain, from Teradata based ETL processes to Spark SQL process using HDFS as backed end data storage cluster.
How can we maintain the distribution of data like we have in Teradata based on Primary Index?
Spark does support this feature using “distribute by” method. As the name reflects it distributes the data based on the columns/(valid expressions) you pass it distributes the data across the partitions evenly. Of course if the columns/(valid expressions) chosen by us itself is not balanced, again we will have uneven distribution of data. Which you may even have experienced in Teradata by choosing inappropriate primary index column.
Well, why do we have to distribute the data evenly in all partitions?
Answer is simple. Let’s say in your team there are 4 engineers and your scrum lead is assigning the work load unevenly, will it be of any good? NO.
So distributing the data evenly to eliminate the data skew, will help in improving the join operations on such data sets, and performing any sort of transformations on such data with the use of distributed columns in filter conditions.
How does distribute by clause helps in selecting a particular record from a table?
Yes, when you trigger such select query based on a filter condition, it finds the hash value of the bucketed column/s and finds the exact partition and search for the record you are looking for. As any search would love sorted data, Spark does support sorting the data while writing the partitions to disk. This needs special method called “sort by <column/s>”
So to get the much needed benefits while reading data from Spark table, it is worth applying bucketing sorting and sorting even though it is little expensive. This helps our customers read SQLs perform much better.
So far so good.
What about controlling the number of partitions in a spark table? Is it better to have more partitions with smaller size or smaller number of partitions with bigger size?
Answer is simple. Hadoop is meant for big data. It does not like to deal with smaller size files. So, can control the number of files/partitions creation in spark using set spark.shuffle.partitions = <N> where N being the desired number of partitions.
Any other tricks/configurations that we could play with to improve Spark SQLs performance?
Sure. there are lot more configurations that we can customize. But sometimes too much customization without knowing the impact may lead to negative results. So, try to review the data you are dealing with, and do some tests in dev/qa environments to understand the impact of such configuration changes.
Here is some list of configurations you can start playing with:
- spark.submit.deployMode=cluster
- spark.yarn.queue=<queue name>
- spark.driver.memory=<desired memory> — like 10g
- spark.executor.cores=<desired cores>
- spark.executor.memory=<desired memory>
- spark.sql.shuffle.partitions=10 — remember smaller number of partitions with bigger size is better. But it should be balances. Go by block size and data size.
- spark.executor.heartbeatInterval=20s
- spark.task.maxFailures=4
- spark.rpc.numRetries=4
- spark.rpc.askTimeout=600s
- spark.network.timeout=600s
- spark.memory.fraction=0.75
- spark.memory.storageFraction=0.5
- spark.scheduler.listenerbus.eventqueue.size=100000
- spark.speculation.interval=5000ms
- spark.speculation=true
- spark.rdd.compress=true
- spark.ui.view.acls=*
- spark.ui.view.acls.groups=*
- spark.shuffle.service.enabled=true
- spark.executor.extraJavaOptions=”-XX:ParallelGCThreads=3"
- spark.sql.autoBroadcastJoinThreshold=104857600
- spark.sql.codegen.wholeStage=true
- spark.dynamicAllocation.initialExecutors=10
- spark.dynamicAllocation.minExecutors=10
- spark.dynamicAllocation.maxExecutors=500
- spark.dynamicAllocation.enabled=true
- spark.yarn.scheduler.heartbeat.interval-ms=100000
- spark.yarn.maxAppAttempts=2
- spark.sql.adaptive.enabled=true
- spark.sql.adaptive.maxNumPostShufflePartitions=10000
- spark.sql.adaptive.shuffle.targetPostShuffleInputSize=100m
Background:
In recent past, I worked on an interesting project that was aimed at migrating an enterprise data warehouse that is built on well known MPP Relational data base management software Teradata to Hadoop using Spark as the processing engine.
Our team comes with lot of SQL background, and existing ETL processes are completely built on custom SQL script ; we decided to rewrite the BTEQ SQL scripts in Spark SQL.
During this process, we encountered many issues mainly around understanding the the similarities and dissimilarities between Teradata and Spark architecture, functions and their behavior. Well this is enough background on the situation, and actions. Lets get into more real and fun stuff.
Comments
Post a Comment