How To: Spark SQL Tuning

SparkSQL

While Spark accepts SQL the framework will translate commands into code that is processed by Executors. As a result there are some tuning considerations which can affect the SparkSQL performance.

File Formats

Developers and administrators may set the default data source format to be used for all operations with spark.sql.sources.default. By default Spark SQL will use its native Parquet support in place of the Hive SerDe for performance and this is configured with - spark.sql.hive.convertMetastoreParque. Spark documentation contains additional details regarding the considerations and assumptions made when conversion from Hive format to Spark SQL format occurs during processing of Parquet tables.

Split Optimization

In Spark versions 1.5.1 and above, you can enable this optimization by adding the following to the Spark application’s command line --conf spark.sql.qubole.split.computation=true.  Once enabled, Qubole will leverage the bulk S3 listing optimization to perform faster split computation on Spark SQL queries. This benefits partitioned Hive tables which may be queried using Spark SQL across all formats that depend on FileInputFormat. This does not affect unpartitioned tables and Parquet/ORC tables because they have their own way of listing S3 files.

Table Caching

Developers and administrators may cache tables using an in-memory columnar format by using spark.cacheTable(“name”) or dataFrame.cache(). Spark SQL will tune compression if spark.sql.inMemoryColumnarStorage.compress is true and will only scan necessary columns. The size of batches can be controlled with spark.sql.inMemoryColumnarStorage.batchSize. Larger batch sizes may improve utilization but may result in out of memory errors. Tables may be uncached using spark.uncacheTable(“name”).

Join Optimization

Developers and administrators can potentially reduce resource consumption during the Spark shuffle stage of execution by sending out data from a single table in the join through a Broadcast Join configured with spark.sql.autoBroadcastJoinThreshold.

 

Shuffle Partitions

SparkSQL requires the use of partitions to perform many of the tasks that are submitted via SQL such as aggregations, groupings, joins and filtering. The number of partitions involved in the shuffle - and thus the measure of parallelism - is determined by spark.sql.shuffle.partitions. Developers and administrators can potentially reduce shuffle need by sending out data from a single table in the join through a Broadcast Join configured with spark.sql.autoBroadcastJoinThreshold.

Partition Bytes

The number of bytes written to a partition - and thus the measure of partition size - is determined by - spark.sql.files.maxPartitionBytes. The estimated cost to open a file, in terms of the number of bytes that could be scanned, is set with spark.sql.files.openCostInBytes. Note that Spark documentation recommends overestimating this value so that the partitions with small files are scheduled before the partitions with big files.

 

Have more questions? Submit a request

Comments

Powered by Zendesk