HIVE Dynamic Partitioning tips

There are several cases in which Dynamic Partitioning can be used to improve performance of queries. However, incorrect use of dynamic partitioning can also have a detrimental effect. In this article we will cover two such use cases, and provide tips that can help address the problems caused by particular use of dynamic partitioning. 

CASE 1

A very common mistake in dynamic partition case is the below query:

insert into table abc partition( ds ) select * from src;

Above query is fine if you have very few partitions and number of mappers from src is very limited. However, the effect of above query is:

  • It will be map only job
  • It will generate about or max M*P files total ( P = num of partitions, M = total mappers ), so it results into each partition having M number of files ( most likely - assuming data was wide spread and each mapper found at least one value of that 'ds' )

Side effects of above problem:

  • Huge number of small files per partition
  • S3 503, 500 errors because all the mappers are writing into the same location - so if M=10000 and P=365, it may end up with 3.65 million files and of course very small size files . if 1000 mappers are running at a time on a decent cluster - it might be writing 365k files almost in parallel ( if not really at the same time ). This causes 503 errors from S3 and sometimes job fails, perf gets degraded etc etc.
  • Small files always cause issues in upstream jobs because split computation on 3.65 mill file is going to take time compared to few hundred files
  • Memory issues during dynamic insert - in this case each mapper might be running parallel writers and creating internal buffers so mappers might fail with OOM. Likely case in Orc and Parquet

Solution:

The solution for the above mentioned problems is to use 'distribute by' to make sure insert happens in reduce side and use distribute key on the partition key so that each reducer writes to some different partition(s).


For example, in the above example I was writing into a daily partition table for a years worth of data.
Instead it could simply be this:

set mapred.reduce.tasks=365;
insert into table abc partition( ds ) select * from src distribute by ds;

What if it is writing into 10 years worth of data? - in that case above query will be ok, because then 1 reducer will be writing into 10 partitions.

What if it is writing only 30 real big partitions and runs on a giant cluster where it is undesirable to under-utilize it with 30 reducers. Going with low reduce count will have 2 side effects:

  • Under utilization
  • Extra big files (assuming those partitions are really big and may generate individual files of 10G+). In such case reducers will spend time in even final copy.

In such a case - let's say cluster has capacity to 600 reducers (but we only have 30 partition values)- can we do this:

set mapred.reduce.tasks=600;
insert into table abc partition( ds ) select * from src distribute by ds, <col> * – where col is a very high granular column - so based on col value , 20 reducers will be processing for 1 ds partition . That way you can still utilize your cluster size, avoid OOM issues, avoid large file/small issues .

 

CASE 2


Originally hive/hadoop were designed with HDFS in mind - but in S3 backed systems few things like S3 list etc. become very crucial for job performance - so here is another thing one needs to keep in mind when looking for improvements.

If your query is like this ( this applies in hierarchical partition load case )


INSERT into table abc partition ( year, month, day ) select <col_list>, year , month, day from xyz WHERE year=2015 and month = '06' distribute by day ;


Before jumping into the problem/solution it is important to understand the command flow involved in this case. In dynamic partition case,  first MR job runs and finishes data load in the target location ( s3 in our case ). After MR completion it hands the control back to hive client, and client now scans through the target location and gets a list from metastore db that how many partitions are there in metastore and based on the diff from target scan, it identifies which partitions to be loaded.

Problem: 

Now if you look at the the example query above, syntactically/semantically it is okay but it has a huge task to finish after MR job. Since year, month and day all three are unknown to loader function. It will actually go scan the S3 location from top location, which means all years, months and days  - this can be a problem if you are having data for 10 years and also each day has multiple files, this will simply add more S3 list calls. Sometimes we have seen that MR job finished in just 5 mins but this whole operation takes about an hour.

Solution:

Notice in the above query you actually know your target partially as year and month values are known, so instead of letting hive do entire search, diff on partition location , you can help it a bit by changing your query as follows:

INSERT into table abc partition ( year=2015, month='06', day ) select <col_list>, year , month, day from xyz WHERE year=2015 and month = '06' distribute by day ;

This way hive partition loader now will start scanning from <base_s3_loc>/year=2015/month=06 - that will reduce s3 list substantially.

Have more questions? Submit a request

Comments

Powered by Zendesk