Tez can be enabled for the entire cluster at the bootstrap level or for individual queries at runtime by setting hive.execution.engine = tez. If administrators configure Tez for the entire cluster then individual queries can be reverted back to MapReduce by setting hive.execution.engine = mr at the start of the job.
The number of Tasks for each of the Mapper or Reducer Vertices is displayed in the Logs Pane The information is displayed as A (+B, -C) / D where:
- A: number of completed tasks
- B: number of running tasks
- C: number of failed tasks
- D: total number of tasks.
Out Of Memory errors may occur when there are an exceptionally large number of tasks being executed in parallel or there are too many files involved in the split computation. Managing the Application Master configuration can ensure that these types of issues do not occur. This memory is controlled with tez.am.resource.memory.mb and a good starting point for this value may be yarn.app.mapreduce.am.resource.mb. The memory available for the Containers (JVMs) is controlled with tez.am.launch.cmd-opts and this typically set to 80% of tez.resource.memory.mb.
Container Limitation issues may occur if the amount of memory required is more than what is available per the allocation policy. If this occurs Tez will throw an error indicating that it is “killing the container” in response to the demands of the container. The container size is set with hive.tez.container.size and should be set as a multiple of yarn.scheduler.minimum-allocation-mb. The child java operations are controlled via hive.tez.java.opts and should be set to approximately 80% of hive.tez.container.size
Split computation takes place in the Application Master and by default the Max Split Size is 1 GB and the Min Split Size is 50 MB. Developers may modify the Split Sizing policy by modifying tez.grouping.max-size and tez.grouping.min-size. Tez uses the HiveInputFormat in conjunction with the grouping settings to ensures that the numbers of Mappers does not become a bottleneck.This is different than MapReduce which uses the CombinedHiveInputFormat by default which can result in less Mapper Tasks. As a result it can be misleading to compare the number of Mapper Tasks between MapReduce and Tez to gauge performance improvements.
To enable Split Pruning during split computation configur the following:
set hive.optimize.index.filter = true;
The parallelism across the reducers is set by affecting the average reducer size in bytes - hive.exec.reducers.bytes.per.reducer - as this value decreases more reducers will be introduced so that the load is distributed across more tasks. The parallelism across the mappers is set by affecting tez.am.grouping.split-waves which indicates the ratio between the number of tasks per vertex compared to the number of available containers in the queue. As this value decreases more parallelism is introduced but there are less resources allocated to a single job.
While often inconsequential the garbage collection process can lead to increased run time if there are increasingly complex data types and queries used. The amount of time taken for garbage collection can be identified via the Tez Application UI or by enabling hive.tez.exec.print.summary. If garbage collection times are higher than acceptable or expected consider the components of the Hive functionality which may be increasing runtime.
When taking advantage of Map Joins in Hive keep in mind that the larger and more complex the Hash Table used for the Map Join the greater the burden on the Garbage Collection process. If the Map Join is necessary to avoid a Shuffle Join or due to performance considerations then it may be necessary to increase the container size so that additional resources are available for Garbage Collection. If the Map Join is not needed then consider disabling or decreasing the value of hive.auto.convert.join.noconditionaltask.size to force the query to use a Shuffle Join.
When inserting into a table which writes to an ORC file if there are a large number columns present consider reducing hive.exec.orc.default.bagger.size or increasing the container size.
During partitioned inserts the performance may be impacted if there are a large number of tasks inserting into multiple partitions at the same time. If this is observed consider enabling hive.optimize.sort.dynamic.partition. Only do this if inserting into more than 10 partitions because this can have a negative impact on performance with a very small number of partitions.
Developers may run the following command to trigger accurate size accounting by the compiler:
ANALYZE TABLE [table_name] COMPUTE STATISTICS for COLUMNS
After execution of the above statement enable hive.stats.fetch.column.stats which triggers the Hive physical optimizer to use more accurate per-column statistics instead of the uncompressed file size represented by HDFS. Also after collecting and calculating statistics consider enabling the cost-based optimizer (CBO) with hive.cbo.enable - keep in mind that not all operators are support, please refer to Hive documentation for a complete list.