hive任务优化——小文件合并相关参数(代码片段)

疯狂哈丘 疯狂哈丘     2022-12-12     688

关键词:

文章目录

一、Hive提供的文件合并功能

熟悉hdfs的都知道,hdfs不建议存储小文件,原因是大量的小文件会给namenode带来太大的负担。因此,我们在使用hdfs过程中应该尽量保证输出到hdfs的文件不会有大量零碎的小文件。

在大多数情况下,hive都是将数据文件存储在hdfs上。当hive执行类似insert overwrite directory或者insert table等语句时,都会往hdfs写文件。具体生成的文件数量和文件大小则和任务的数据量、执行计划有关。如果我们可以准确的知道一个任务的输入数据量以及对应的执行计划,那么我们则可以通过调整hive任务的map数量和reduce数量来控制最终落地到hdfs的文件数量(一个任务落地的数据量大小肯定是固定的,我们能调整的只有文件数量)。这样就可以尽量保证这个任务落地的文件不会是零碎的小文件了。

但是作为集群管理者,我们肯定无法知道所有任务的数据量和执行计划,也不可能对每个任务进行这些参数调整的优化。Hive也考虑到这一点,因此在早期的版本中就加入了相关参数来控制任务输出的文件数量。

参数名作用
hive.merge.mapfiles是否在纯Map的任务(没有reduce task)后开启小文件合并
hive.merge.mapredfiles是否在mapreduce任务后开启小文件合并
hive.merge.sparkfiles是否在hive on spark任务后开启小文件合并
hive.merge.smallfiles.avgsize如果原先输出的文件平均大小小于这个值,则开启小文件合并。比如输出原本有100个文件,总大小1G,那平均每个文件大小只有10M,如果我们这个参数设置为16M,这时就会开启文件合并
hive.merge.size.per.task开启小文件合并后,预期的一个合并文件的大小。比如原先的总大小有1G,我们预期一个文件256M的话,那么最终经过合并会生成4个文件。

hive文件合并的实现原理

Hive在任务结束后,不同的引擎根据不同的参数来判断是否需要进行文件合并检查。比如如果使用的spark引擎,则需要设置hive.merge.sparkfiles为true,如果mapreduce引擎,则设置hive.merge.mapredfiles为true。

是否开启文件合并检查我们可以通过hive的执行计划来判断。我们先将hive.merge.sparkfiles设置为false,然后执行以下语句输出执行计划:

explain insert overwrite directory '/tmp/' select * from yjbtest.test;

最后得到如下执行计划:

+----------------------------------------------------+
|                      Explain                       |
+----------------------------------------------------+
| STAGE DEPENDENCIES:                                |
|   Stage-1 is a root stage                          |
|   Stage-0 depends on stages: Stage-1               |
|                                                    |
| STAGE PLANS:                                       |
|   Stage: Stage-1                                   |
|     Spark                                          |
|       DagName: hadoopuser_20190909151818_944f4632-cd15-4263-ad0e-f210aff3d6b3:3028 |
|       Vertices:                                    |
|         Map 1                                      |
|             Map Operator Tree:                     |
|                 TableScan                          |
|                   alias: test                      |
|                   Statistics: Num rows: 141 Data size: 903 Basic stats: COMPLETE Column stats: NONE |
|                   Select Operator                  |
|                     expressions: id (type: int), name (type: string) |
|                     outputColumnNames: _col0, _col1 |
|                     Statistics: Num rows: 141 Data size: 903 Basic stats: COMPLETE Column stats: NONE |
|                     File Output Operator           |
|                       compressed: false            |
|                       Statistics: Num rows: 141 Data size: 903 Basic stats: COMPLETE Column stats: NONE |
|                       table:                       |
|                           input format: org.apache.hadoop.mapred.TextInputFormat |
|                           output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat |
|                           serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe |
|                                                    |
|   Stage: Stage-0                                   |
|     Move Operator                                  |
|       files:                                       |
|           hdfs directory: true                     |
|           destination: /tmp                        |
|                                                    |
+----------------------------------------------------+

上面的执行计划也很简单,就是分成两个阶段,Stage-1完成后会将结果输出到某个临时目录,之后Stage-0直接Move到最终的输出目录即可(不会进行小文件合并)。

之后我们把hive.merge.sparkfiles参数设置为true,再输出执行计划,会得到:

+----------------------------------------------------+
|                      Explain                       |
+----------------------------------------------------+
| STAGE DEPENDENCIES:                                |
|   Stage-1 is a root stage                          |
|   Stage-6 depends on stages: Stage-1 , consists of Stage-3, Stage-2, Stage-4 |
|   Stage-3                                          |
|   Stage-0 depends on stages: Stage-3, Stage-2, Stage-5 |
|   Stage-2                                          |
|   Stage-4                                          |
|   Stage-5 depends on stages: Stage-4               |
|                                                    |
| STAGE PLANS:                                       |
|   Stage: Stage-1                                   |
|     Spark                                          |
|       DagName: hadoopuser_20190909152045_69d1724e-184a-4883-b38d-3aad1334f787:3029 |
|       Vertices:                                    |
|         Map 1                                      |
|             Map Operator Tree:                     |
|                 TableScan                          |
|                   alias: test                      |
|                   Statistics: Num rows: 141 Data size: 903 Basic stats: COMPLETE Column stats: NONE |
|                   Select Operator                  |
|                     expressions: id (type: int), name (type: string) |
|                     outputColumnNames: _col0, _col1 |
|                     Statistics: Num rows: 141 Data size: 903 Basic stats: COMPLETE Column stats: NONE |
|                     File Output Operator           |
|                       compressed: false            |
|                       Statistics: Num rows: 141 Data size: 903 Basic stats: COMPLETE Column stats: NONE |
|                       table:                       |
|                           input format: org.apache.hadoop.mapred.TextInputFormat |
|                           output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat |
|                           serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe |
|                                                    |
|   Stage: Stage-6                                   |
|     Conditional Operator                           |
|                                                    |
|   Stage: Stage-3                                   |
|     Move Operator                                  |
|       files:                                       |
|           hdfs directory: true                     |
|           destination: hdfs://test45cluster/tmp/.hive-staging_hive_2019-09-09_15-20-45_204_1372713564087112462-2811/-ext-10000 |
|                                                    |
|   Stage: Stage-0                                   |
|     Move Operator                                  |
|       files:                                       |
|           hdfs directory: true                     |
|           destination: /tmp                        |
|                                                    |
|   Stage: Stage-2                                   |
|     Spark                                          |
|       DagName: hadoopuser_20190909152045_69d1724e-184a-4883-b38d-3aad1334f787:3030 |
|       Vertices:                                    |
|         Spark Merge File Work                      |
|             Map Operator Tree:                     |
|                 TableScan                          |
|                   File Output Operator             |
|                     compressed: false              |
|                     table:                         |
|                         input format: org.apache.hadoop.mapred.TextInputFormat |
|                         output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat |
|                         serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe |
|                                                    |
|   Stage: Stage-4                                   |
|     Spark                                          |
|       DagName: hadoopuser_20190909152045_69d1724e-184a-4883-b38d-3aad1334f787:3030 |
|       Vertices:                                    |
|         Spark Merge File Work                      |
|             Map Operator Tree:                     |
|                 TableScan                          |
|                   File Output Operator             |
|                     compressed: false              |
|                     table:                         |
|                         input format: org.apache.hadoop.mapred.TextInputFormat |
|                         output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat |
|                         serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe |
|                                                    |
|   Stage: Stage-5                                   |
|     Move Operator                                  |
|       files:                                       |
|           hdfs directory: true                     |
|           destination: hdfs://test45cluster/tmp/.hive-staging_hive_2019-09-09_15-20-45_204_1372713564087112462-2811/-ext-10000 |
|                                                    |
+----------------------------------------------------+

我们看到多了很多Stage,这些Stage就是和文件合并相关的。其中关键是Stage-6(Stage 1执行完后执行它),它是一个ConditionalTask,主要根据一些条件判断要执行哪个Stage:

  • Stage 3:不执行文件合并,直接Move数据到目标目录
  • Stage 2:开启一个Job,读取之前临时目录的数据,然后输出到另一个临时目录
  • Stage 4:开启一个Job,读取之前临时目录的数据,然后输出到另一个临时目录(和Stage 2不同的是它后面还接了一个Stage 5。这种情况主要是在动态分区时,有些分区目录的文件已经不需要合并了,所以把整个合并流程分成merge and move)

总之,如果根据参数 hive.merge.smallfiles.avgsize 发现需要做小文件合并,则开启一个新的Job进行小文件合并。这就是Hive进行文件合并的真相。

二、文件合并在Hive on Spark中的失效问题

Hive中的hive.merge.size.per.task参数是用来控制合并后的文件的预期大小的,但是我们在hive on spark的测试过程中发现这个参数并没有生效,而在hive on mr中却可以生效。

一开始怀疑是不是hive on spark中没开启文件合并的job,但是看了hive的执行日志和job的执行记录,都表明已经开启了一个新的Job来合并文件,只是这个合并Job的map task数量和预期的有出入。

我们已经知道Hive会提交一个新的Job来合并文件,很明显,这个Job应该是一个Map Only的任务,因此Map task的数量就是最终文件的数量。但是通过实际观察,在Hive on Spark中,无论hive.merge.size.per.task参数怎么调整,合并Job的Map task数量都不会调整。

既然和Map task的数量有关,那么我们可以回想一下正常情况下,我们要怎么调整一个Job的Map task数量。主要和以下参数有关:

  • maxSize: 通过mapreduce.input.fileinputformat.split.maxsize参数设置
  • minSizePerNode: 通过mapreduce.input.fileinputformat.split.minsize.per.node参数设置
  • minSizePerRack:通过mapreduce.input.fileinputformat.split.minsize.per.rack参数设置

具体的可以看这篇博客:【Hive任务优化】—— Map、Reduce数量调整

所以正常情况下,Map数量和hive.merge.size.per.task是没有关系的。那么hive.merge.size.per.task是怎么作用到这几个参数上,来修改合并Job的Map task数量呢?

因为Hive on Mr是可以生效的,因此看了下相关代码。发现Hive在确定需要合并时,会往相关的Task的MapWork中设置hive.merge.size.per.task的值。之后提交MapReduce任务时,会获取这个值,然后设置到Job参数中(就是设置影响map task的那几个参数):

//ExecDriver.java
public static void propagateSplitSettings(JobConf job, MapWork work) 
  if (work.getNumMapTasks() != null) 
    job.setNumMapTasks(work.getNumMapTasks().intValue());
  
 
  if (work.getMaxSplitSize() != null) 
    HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMAXSPLITSIZE, work.getMaxSplitSize().longValue());
  
 
  if (work.getMinSplitSize() != null) 
    HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, work.getMinSplitSize().longValue());
  
 
  if (work.getMinSplitSizePerNode() != null) 
    HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERNODE, work.getMinSplitSizePerNode().longValue());
  
 
  if (work.getMinSplitSizePerRack() != null) 
    HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERRACK, work.getMinSplitSizePerRack().longValue());
  

后面又找了下Hive on spark提交Job的相关代码,并没有找到设置相关参数的代码

解决方案

修改Hive源码,在Hive on spark提交Job的代码加上设置相关参数的代码:

//RemoteHiveSparkClient.java类的submit方法
//update the credential provider location in the jobConf
HiveConfUtil.updateJobCredentialProviders(jobConf);
//插入代码段开始
try 
  //从work中获取相关的参数,提前设置相关参数。不然spark在进行文件合并时不会生效
  MapWork mapWork = (MapWork) sparkWork.getAllWork().get(0);
  propagateSplitSettings(jobConf,mapWork);
 catch (Exception e) 
  e.printStackTrace();

//插入代码段结束
// Create temporary scratch dir
final Path emptyScratchDir = ctx.getMRTmpPath();

三、开启文件合并的优缺点

我们知道,文件合并需要开启一个新的Job,因此任务整体性能肯定会降低,使用的资源也会变多,这是文件合并的缺点

但是从长远来看,还是开启文件合并比较好。因为过多的小文件如果不断积累,会给hdfs集群带来难以想象的负担。当集群因为小文件数量达到瓶颈,要处理起这些小文件也需要一定的工作量。与其这样,不如提前开启文件合并,避免过多的小文件生成。

或者使用折中方案,我们可以调低hive.merge.smallfiles.avgsize的值,这样就可以只针对那些真正很零散的小文件进行合并。

hivemapreduce性能优化(代码片段)

一、Hive任务创建文件数优化1.1Map端文件合并减少Map任务数量一般来说,HDFS的默认文件块大小是128M,如果在Hive执行任务时,发现Map端的任务过多,且执行时间多数不超过一分钟,建议通过参数,划分(split)文件的大小,合并小文... 查看详情

hivemapreduce性能优化(代码片段)

一、Hive任务创建文件数优化1.1Map端文件合并减少Map任务数量一般来说,HDFS的默认文件块大小是128M,如果在Hive执行任务时,发现Map端的任务过多,且执行时间多数不超过一分钟,建议通过参数,划分(split)文件的大小,合并小文... 查看详情

hive优化(十三)-小文件进行合并

参考技术A在Map执行前合并小文件,减少Map数:1)参数设置这个参数表示执行前进行小文件合并,前面三个参数确定合并文件块的大小,大于文件块大小128m的,按照128m来分隔,小于128m,大于100m的,按照100m来分隔,把那些小于10... 查看详情

hive优化之小文件合并

...文件来消除这样的影响:sethive.merge.mapfiles=true##在maponly的任务结束时合并小文件sethive.merge.mapredfiles=false##true时在MapReduce的任务结束时合并小文件sethive.merge.size.per.task=256*1000*1000##合并文件的大小setmapred.max.split.size=256000000;##每个Map... 查看详情

hive如何处理大量小文件

...,每一个map都会开启一个JVM虚拟机,每个虚拟机都要创建任务,执行任务,这些流程都会造成大量的资源浪费,严重影响性能2在HDFS中,每个小文件约占150byte,如果小文件过多则会占用大量的内存。这样namenode内存容量严重制约... 查看详情

hive参数优化和数据倾斜(代码片段)

...据倾斜原因:key分布不均匀,数据重复表现:任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处 查看详情

texthive优化小文件合并(代码片段)

查看详情

hive:小文件合并

...立的jvm实例,其开启和停止的开销可能会大大超过实际的任务处理时间。同时我们知道hive输出最终是mr的输出,即reducer(或mapper)的输出,有多少个reducer(mapper)输出就会生成多少个输出文件,根据shuffle/sort的原理,每个文件... 查看详情

hive优化笔记(代码片段)

目录减少job数。3.配置角度优化3.1列裁剪4.2分区裁剪3.3JOIN操作3.4MAPJOIN操作3.5GROUPBY操作3.6合并小文件5.程序角度优化5.1熟练使用SQL提高查询5.2无效ID在关联时的数据倾斜问题5.3不同数据类型关联产生的倾斜问题5.4利用Hive对UNIONALL优... 查看详情

hive优化

1.hive小文件合并cdhive/conf/hive-default输出合并合并输出小文件。输出时,若是太多小文件,每个小文件会与一个block进行对应,而block存在的意义是为了方便在namenode中存储,那么过多的block将会充斥namenode的表中,待集群规模增大... 查看详情

hive相关优化

Hive相关优化1:首先模型设计能解决很大问题,2:其次就是解决数据倾斜,减少job数,3:设置合理的maptask和reducetask数可以有效提高性能,4:数据量较大的时候,尽量不要用count(distinct)(会产生数据倾斜)&#... 查看详情

hive小文件合并设置参数

...运算速度,我们会增加Reducer的数量,Hive本身也会做类似优化——Reducer数量等于源数据的量除以hive.exec.reducer 查看详情

对hive数仓表进行高效小文件合并

...会开很多map,一个map开一个JVM去执行,所以这些任务的初始化,启动,执行会浪费大量的资源,严重影响性能小文件会给底层文件系统带来很大压力,如在HDFS中,每个小文件对象约占150byte,如果小... 查看详情

关于hive当中的map输入数据和reduce输出数据的合并。(代码片段)

最近在做要执行很复杂的sql.然后在文件输出的时候出现了一堆小文件:为啥要对小文件进行合并一句话总结为:文件数目过多,增加namenode的压力。因为每一个文件的元数据信息都是存在namenode上面的。所以要减少小文件的数据... 查看详情

hadoop调优(代码片段)

...化方法2.1数据输入(1)合并小文件:在执行mr任务前将小文件进行合并,大量的小文件会产生大量的map任务, 查看详情

hive优化之hive的配置参数优化

...数和HiveSQL的执行等。本文主要从建表配置参数方面对Hive优化进行讲解。1.创建一个普通表tabletest_user1(idint,namestring,codestring,code_idstring)ROWFORMATDELIMITEDFIELDSTERMINATED BY',';2.查看这张表的信息DESCRIBEFORMATTED test_user1;我们从... 查看详情

hiveontez小文件合并的技术调研(代码片段)

...度依赖租来的阿里云DataPhin,那么最常用的离线跑批任务还是要使用HQL【也就是HiveOnTez】。HQL上手门槛极低,之前搞那种Oracle数据库开发的也可以一周内升任SqlBoy岗位,这是其一;PySpark任务或者用Java/Scala打Jar包的... 查看详情

hive中控制文件生产个数(代码片段)

...如果reducer设置过多,则会导致产生很多小文件,这样对任务的执行以及集群都不太好.通常情况下这两个参数都不需要手动设置,Hive会根据文件的大小和任务的情况自动计算,但是某些特殊情况下可能需要进行调优,下面列举两... 查看详情