2023年大数据面试通关文牒系列篇

2023-11-11

大数据面试通关文牒系列篇

第二篇:Hadoop生态链

Round 1: HIVE

HIVE 基础篇

1、Hive内部表和外部表的区别
未被external修饰的是内部表,被external修饰的为外部表。

区别:

内部表数据由Hive自身管理,外部表数据由HDFS管理;

内部表数据存储的位置是hive.metastore.warehouse.dir(默认:/user/hive/warehouse),外部表数据的存储位置由自己制定(如果没有LOCATION,Hive将在HDFS上的/user/hive/warehouse文件夹下以外部表的表名创建一个文件夹,并将属于这个表的数据存放在这里);

删除内部表会直接删除元数据(metadata)及存储数据;删除外部表仅仅会删除元数据,HDFS上的文件并不会被删除。

2、Hive有索引吗
Hive支持索引(3.0版本之前),但是Hive的索引与关系型数据库中的索引并不相同,比如,Hive不支持主键或者外键。并且Hive索引提供的功能很有限,效率也并不高,因此Hive索引很少使用。

索引适用的场景:

适用于不更新的静态字段。以免总是重建索引数据。每次建立、更新数据后,都要重建索引以构建索引表。

Hive索引的机制如下:

hive在指定列上建立索引,会产生一张索引表(Hive的一张物理表),里面的字段包括:索引列的值、该值对应的HDFS文件路径、该值在文件中的偏移量。

Hive 0.8版本后引入bitmap索引处理器,这个处理器适用于去重后,值较少的列(例如,某字段的取值只可能是几个枚举值) 因为索引是用空间换时间,索引列的取值过多会导致建立bitmap索引表过大。

注意:Hive中每次有数据时需要及时更新索引,相当于重建一个新表,否则会影响数据查询的效率和准确性,Hive官方文档已经明确表示Hive的索引不推荐被使用,在新版本的Hive中已经被废弃了。

扩展:Hive是在0.7版本之后支持索引的,在0.8版本后引入bitmap索引处理器,在3.0版本开始移除索引的功能,取而代之的是2.3版本开始的物化视图,自动重写的物化视图替代了索引的功能。

3、运维如何对Hive进行调度
将hive的sql定义在脚本当中;

使用azkaban或者oozie进行任务的调度;

监控任务调度页面。

4、ORC、Parquet等列式存储的优点
ORC和Parquet都是高性能的存储方式,这两种存储格式总会带来存储和性能上的提升。

Parquet:

Parquet支持嵌套的数据模型,类似于Protocol Buffers,每一个数据模型的schema包含多个字段,每一个字段有三个属性:重复次数、数据类型和字段名。
重复次数可以是以下三种:required(只出现1次),repeated(出现0次或多次),optional(出现0次或1次)。每一个字段的数据类型可以分成两种: group(复杂类型)和primitive(基本类型)。

Parquet中没有Map、Array这样的复杂数据结构,但是可以通过repeated和group组合来实现的。

由于Parquet支持的数据模型比较松散,可能一条记录中存在比较深的嵌套关系,如果为每一条记录都维护一个类似的树状结可能会占用较大的存储空间,因此Dremel论文中提出了一种高效的对于嵌套数据格式的压缩算法:Striping/Assembly算法。通过Striping/Assembly算法,parquet可以使用较少的存储空间表示复杂的嵌套格式,并且通常Repetition level和Definition level都是较小的整数值,可以通过RLE算法对其进行压缩,进一步降低存储空间。

Parquet文件是以二进制方式存储的,是不可以直接读取和修改的,Parquet文件是自解析的,文件中包括该文件的数据和元数据。

ORC:

ORC文件是自描述的,它的元数据使用Protocol Buffers序列化,并且文件中的数据尽可能的压缩以降低存储空间的消耗。

和Parquet类似,ORC文件也是以二进制方式存储的,所以是不可以直接读取,ORC文件也是自解析的,它包含许多的元数据,这些元数据都是同构ProtoBuffer进行序列化的。

ORC会尽可能合并多个离散的区间尽可能的减少I/O次数。

ORC中使用了更加精确的索引信息,使得在读取数据时可以指定从任意一行开始读取,更细粒度的统计信息使得读取ORC文件跳过整个row group,ORC默认会对任何一块数据和索引信息使用ZLIB压缩,因此ORC文件占用的存储空间也更小。

在新版本的ORC中也加入了对Bloom Filter的支持,它可以进一 步提升谓词下推的效率,在Hive 1.2.0版本以后也加入了对此的支 持。

5、数据建模用的哪些模型?

  1. 星型模型

星形模式(Star Schema)是最常用的维度建模方式。星型模式是以事实表为中心,所有的维度表直接连接在事实表上,像星星一样。 星形模式的维度建模由一个事实表和一组维表成,且具有以下特点:

a. 维表只和事实表关联,维表之间没有关联;

b. 每个维表主键为单列,且该主键放置在事实表中,作为两边连接的外键;

c. 以事实表为核心,维表围绕核心呈星形分布。

  1. 雪花模型

雪花模式(Snowflake Schema)是对星形模式的扩展。雪花模式的维度表可以拥有其他维度表的,虽然这种模型相比星型更规范一些,但是由于这种模型不太容易理解,维护成本比较高,而且性能方面需要关联多层维表,性能比星型模型要低。

  1. 星座模型

星座模式是星型模式延伸而来,星型模式是基于一张事实表的,而星座模式是基于多张事实表的,而且共享维度信息。前面介绍的两种维度建模方法都是多维表对应单事实表,但在很多时候维度空间内的事实表不止一个,而一个维表也可能被多个事实表用到。在业务发展后期,绝大部分维度建模都采用的是星座模式。

数仓建模详细介绍可查看:通俗易懂数仓建模

6、为什么要对数据仓库分层?

用空间换时间,通过大量的预处理来提升应用系统的用户体验(效率),因此数据仓库会存在大量冗余的数据。

如果不分层的话,如果源业务系统的业务规则发生变化将会影响整个数据清洗过程,工作量巨大。

通过数据分层管理可以简化数据清洗的过程,因为把原来一步的工作分到了多个步骤去完成,相当于把一个复杂的工作拆成了多个简单的工作,把一个大的黑盒变成了一个白盒,每一层的处理逻辑都相对简单和容易理解,这样我们比较容易保证每一个步骤的正确性,当数据发生错误的时候,往往我们只需要局部调整某个步骤即可。

数据仓库详细介绍可查看:万字详解整个数据仓库建设体系

7、使用过Hive解析JSON串吗

Hive处理json数据总体来说有两个方向的路走:

将json以字符串的方式整个入Hive表,然后通过使用UDF函数解析已经导入到hive中的数据,比如使用LATERAL VIEW json_tuple的方法,获取所需要的列名。

在导入之前将json拆成各个字段,导入Hive表的数据是已经解析过的。这将需要使用第三方的 SerDe。

详细介绍可查看:Hive解析Json数组超全讲解

8、sort by 和 order by 的区别

order by 会对输入做全局排序,因此只有一个reducer(多个reducer无法保证全局有序)只有一个reducer,会导致当输入规模较大时,需要较长的计算时间。

sort by不是全局排序,其在数据进入reducer前完成排序. 因此,如果用sort by进行排序,并且设置mapred.reduce.tasks>1, 则sort by只保证每个reducer的输出有序,不保证全局有序。

9、数据倾斜怎么解决

数据倾斜问题主要有以下几种:

空值引发的数据倾斜

不同数据类型引发的数据倾斜

不可拆分大文件引发的数据倾斜

数据膨胀引发的数据倾斜

表连接时引发的数据倾斜

确实无法减少数据量引发的数据倾斜

以上倾斜问题的具体解决方案可查看:Hive千亿级数据倾斜解决方案

注意:对于 left join 或者 right join 来说,不会对关联的字段自动去除null值,对于 inner join 来说,会对关联的字段自动去除null值。

小伙伴们在阅读时注意下,在上面的文章(Hive千亿级数据倾斜解决方案)中,有一处sql出现了上述问题(举例的时候原本是想使用left join的,结果手误写成了join)。此问题由公众号读者发现,感谢这位读者指正。

10、Hive 小文件过多怎么解决

  1. 使用 hive 自带的 concatenate 命令,自动合并小文件
    使用方法:

#对于非分区表alter table A concatenate;#对于分区表alter table B partition(day=20201224) concatenate;

注意: 1、concatenate 命令只支持 RCFILE 和 ORC 文件类型。
2、使用concatenate命令合并小文件时不能指定合并后的文件数量,但可以多次执行该命令。
3、当多次使用concatenate后文件数量不在变化,这个跟参数
mapreduce.input.fileinputformat.split.minsize=256mb
的设置有关,可设定每个文件的最小size。

  1. 调整参数减少Map数量
    设置map输入合并小文件的相关参数(执行Map前进行小文件合并):

在mapper中将多个文件合成一个split作为输入(CombineHiveInputFormat底层是Hadoop的CombineFileInputFormat方法):

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; – 默认

每个Map最大输入大小(这个值决定了合并后文件的数量):

set mapred.max.split.size=256000000; – 256M

一个节点上split的至少大小(这个值决定了多个DataNode上的文件是否需要合并):

set mapred.min.split.size.per.node=100000000; – 100M

一个交换机下split的至少大小(这个值决定了多个交换机上的文件是否需要合并):

set mapred.min.split.size.per.rack=100000000; – 100M

  1. 减少Reduce的数量
    reduce 的个数决定了输出的文件的个数,所以可以调整reduce的个数控制hive表的文件数量。

hive中的分区函数 distribute by 正好是控制MR中partition分区的,可以通过设置reduce的数量,结合分区函数让数据均衡的进入每个reduce即可:

#设置reduce的数量有两种方式,第一种是直接设置reduce个数set mapreduce.job.reduces=10;#第二种是设置每个reduce的大小,Hive会根据数据总大小猜测确定一个reduce个数set
hive.exec.reducers.bytes.per.reducer=5120000000; –
默认是1G,设置为5G#执行以下语句,将数据均衡的分配到reduce中set mapreduce.job.reduces=10;insert
overwrite table A partition(dt)select * from Bdistribute by rand();

对于上述语句解释:如设置reduce数量为10,使用 rand(), 随机生成一个数 x % 10 , 这样数据就会随机进入 reduce 中,防止出现有的文件过大或过小。

  1. 使用hadoop的archive将小文件归档
    Hadoop Archive简称HAR,是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样在减少namenode内存使用的同时,仍然允许对文件进行透明的访问。

#用来控制归档是否可用set hive.archive.enabled=true;#通知Hive在创建归档时是否可以设置父目录set hive.archive.har.parentdir.settable=true;#控制需要归档文件的大小set
har.partfile.size=1099511627776;使用以下命令进行归档:ALTER TABLE A ARCHIVE
PARTITION(dt=‘2021-05-07’, hr=‘12’);对已归档的分区恢复为原文件:ALTER TABLE A
UNARCHIVE PARTITION(dt=‘2021-05-07’, hr=‘12’);

注意: 归档的分区可以查看不能 insert overwrite,必须先 unarchive

Hive 小文件问题具体可查看:解决hive小文件过多问题

11、Hive优化有哪些

  1. 数据存储及压缩:
    针对hive中表的存储格式通常有orc和parquet,压缩格式一般使用snappy。相比与textfile格式表,orc占有更少的存储。因为hive底层使用MR计算架构,数据流是hdfs到磁盘再到hdfs,而且会有很多次,所以使用orc数据格式和snappy压缩策略可以降低IO读写,还能降低网络传输量,这样在一定程度上可以节省存储,还能提升hql任务执行效率;

  2. 通过调参优化:
    并行执行,调节parallel参数;

调节jvm参数,重用jvm;

设置map、reduce的参数;开启strict mode模式;

关闭推测执行设置。

  1. 有效地减小数据集将大表拆分成子表;结合使用外部表和分区表。
  2. SQL优化
    大表对大表:尽量减少数据集,可以通过分区表,避免扫描全表或者全字段;

大表对小表:设置自动识别小表,将小表放入内存中去执行。

Hive优化详细剖析可查看:Hive企业级性能优化

12、Tez引擎优点?
Tez可以将多个有依赖的作业转换为一个作业,这样只需写一次HDFS,且中间节点较少,从而大大提升作业的计算性能。

Mr/tez/spark区别:

Mr引擎:多job串联,基于磁盘,落盘的地方比较多。虽然慢,但一定能跑出结果。一般处理,周、月、年指标。

Spark引擎:虽然在Shuffle过程中也落盘,但是并不是所有算子都需要Shuffle,尤其是多算子过程,中间过程不落盘 DAG有向无环图。 兼顾了可靠性和效率。一般处理天指标。

Tez引擎:完全基于内存。 注意:如果数据量特别大,慎重使用。容易OOM。一般用于快速出结果,数据量比较小的场景。

Hive进阶篇

1. 大表Join小表产生的问题,如何解决

大表Join小表,必然使用MapJoin

Join因为空值导致长尾(key为空值是用随机值代替)

Join因为热点值导致长尾,也可以将热点数据和非热点数据分开处理,最后合并

2. UDF、UDAF、UDTF的区别

UDF:用户定义函数

UDAF:用户定义聚集函数

UDTF:用户定义表生成函数

3. Hive有哪些保存元数据的方式

  • 内存数据库derby,安装小,数据存在内存中,不稳定
  • MySql数据库,数据存储模式可以自己设置,持久化好,查看方便

4. Hive内部表和外部表的区别

默认创建内部表,创建外部表,需要加上external关键字修饰,还可通过location指定Hive仓库的路径

默认仓库路径:

内部表:内部表的默认创建路径在:/user/hive/warehouse/database.db/xm_testA
外部表:外部表的默认创建路径在:/user/hive/warehouse/database.db/xm_testB

drop表

内部表:内部表删除后会将元数据和路径下的文件都删除
外部表:外部表只删除元数据,不删除路径下的文件

load加载数据:

内部表:会把数据移动到自己指定的路径下
外部表:不会把数据移动到自己的数据仓库目录下,也因此证明外部表的数据不是由自己管理的
  • 做etl处理时,通常会选择内部表做中间表,因为清理时,会将HDFS上的文件同时删除
  • 如果怕误删数据,可以选择外部表,因为不会删除文件,方便恢复数据
  • 如果对数据的处理都是通过hql语句完成,选择内部表,如果有其他工具一同处理,选择外部表

在没有其他限制的情况下,优先使用外部表,因为:

  • 不会加载数据到hive,减少数据传输,还能共享
  • 不会对HDFS中的数据修改,不用担心数据损坏,删除表时只删除表结构,不删除数据

5. 生产环境中为什么建议使用外部表

答案见问题四

6. insert into和override write区别

insert into:将数据写入表中

override write:覆盖之前内容

7. Hive的判断函数有哪些?

--	if (boolean testCondition, T valueTrue, T valueFalseOrNull)
select sal,if(sal < 1500,if(sal < 3000 , 2 , 3 ) ) from emp

--	CASE WHEN a THEN b [wHEN c THEN d] * [ELSE e] END
--	将emp表的员工工资等级分类:0-1500,1500-3000,3000以上
select sal,case when sal < = 1500 then 1
								when sal < = 3000 then 2 
								else 3 end salleve
from emp

--	COALESCE(T v1, T v2, ···)	返回参数中的第一个非空值;如果所有值都为NULL,那么返回NULL
select sal,coalesce(comm, 0) from emp;

--	isnull(a) isnotnull(a)
select * from emp where isnull(comm);
select * from emp where isnotnull(comm);

--	nvl(t value, T default_value)
select empno,ename,job,sal + nvl(comm,0) sumsal from emp;

--	nullif(x,y)相等为空,否则为a
select nullif("b", "b"),nullif("b", "a");	

8. 简单描述一下Hive的功能,用Hive创建表有几种方式,Hive表有几种

Hive主要做离线分析

Hive建表有三种方式:

  • 直接建表
  • 查询建表:将自查询的结果存在新表里,一般用于中间表
  • Like建表法:常见结构完全相同的表,但无数据

hive表有两种:外部表与内部表

9. 线上业务每天产生的业务日志(压缩后 >= 3G),每天需加载到Hive的log表中,将每天产生的业务日志在压缩之后load到Hive的log表时,最好使用的压缩算法是哪个,并说明其原因

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DeHaTPVH-1688880016415)(img/Hive压缩算法对比图.png)]

选择lzo,因为该压缩算法可切分,压缩率比较高,解压缩速度很快,非常适合日志

10. 若Hive中建立分区仍不能优化查询效率,建表时如何优化

建立分区分桶表

11. union和union all的区别

union 去重

union all 不去重

12. 如何解决Hive数据倾斜的问题

  1. 大表与小表Join

    ​ MapJoin(hint指定小表、参数配置自动做MapJoin)

  2. 大表与大表Join

    ​ 空值单独提取出来分析后合并

    ​ 给空值在join时使用随机数打散

  3. Group By

    1. 开启Map端聚合配置
    2. 配置有GroupBy操作时数据倾斜进行负载均衡
  4. COUNT(DISTINCT)

    ​ 将count(distinct)替换为先Group By,再Count的方式

13. Hive性能优化常用的方法

Hive调优

14. 简述Delete、Drop、Trancate的区别

delete:删除数据

drop:删除表

truncate:摧毁表结构并重建

15. order by , sort by , distribute by , cluster by 的区别

  • **order by:**会对输入做全局排序,因此只有一个 reducer(多个 reducer无法保证全局有序),然而只有一个 Reducer会导致当输入规模较大时,消耗较长的计算时间

  • **sort by:**不是全局排序,其在数据进入 reducer前完成排序,因此,如果用 sort by进行排序并且设置 mapped. reduce. tasks >1,则 sort by只会保证每个 reducer的输出有序,并不保证全局有序。(全排序实现:先用 sortby保证每个 reducer输出有序,然后在进行 order by归并下前面所有的 reducer输出进行单个 reducer排序,实现全局有序。)

  • **distribute by:**控制在map端如何拆分数据给 reduce端的。hive会根据 distribute by后面列,对应 reduce的个数进行分发,默认是采用hash算法。sort by为每个 reduce产生一个排序文件。在有些情况下,你需要控制某个特定行应该到哪个 reducer,这通常是为了进行后续的聚集操作。distribute by刚好可以做这件事。因此, distribute by经常和 sort by配合使用。并且hive规定distribute by 语句要写在sort by语句之前

  • **cluster by:**当distribute by 和 sort by 所指定的字段相同时,即可以使用cluster by

    • 注意:cluster by指定的列只能是升序,不能指定asc和desc

16. Hive 里边字段的分隔符用的什么?为什么用\t?有遇到过字段里 边有\t的情况吗,怎么处理的?为什么不用 Hive 默认的分隔符,默认的分隔符是什么?

Hive默认的字段分隔符为ASCII码控制符\001(^A),建表的时候用fields terminated by ‘\001’

遇到过字段里面有\t的情况,自定义InputFormat,替换为其他分隔符再做后续处理

Hive 的 自定义 Inputformat

17. 分区分桶的区别,为什么要分区

  • **分区表:**原来的一个大表存储的时候分成不同的数据目录进行存储。如果说是单分区表,那么在表的目录下就只有一级子目录,如果说是多分区表,那么在表的目录下有多少分区就有多少级子目录。不管是单分区表,还是多分区表,在表的目录下,和非最终分区目录下是不能直接存储数据文件的
  • **分桶表:**原理和HashPartitioner一样,将Hive中的一张表的数据进行归纳分类的时候,归纳分类规则就是HashPartitioner(需要指定分桶字段,指定分成多少桶)
区别:

除了存储的格式不同之外,主要是作用:

  • 分区表:细化数据管理,缩小MR程序需要扫描的数据量
  • 分桶表:提高Join查询的效率,在一份数据会被经常用来做连接查询的时候建立分桶表,分桶字段就是连接字段;提高采样的效率
有了分区表为什么还要分桶
  1. 获得更高的查询处理效率。桶为表加上了额外的结构,Hive在处理有些查询时可以利用这个结构

  2. 使取样(sampling)更高效。在处理大规模数据集时,在开发和修改查询的阶段,如果能在数据集的一小部分数据上试运行查询,会带来很多的方便

    分桶是相对分区进行更细粒度的划分。分桶将表或者分区的某列值进行Hash值进行分区

    与分区不同的是,分区依据的不是真实数据表文件中的列,而是我们指定的伪列,但是分桶是依据数据表中真实的列而不是伪列

18. MapJoin的原理

MapJoin通常用于一个很小的表和一个大表进行join的场景,具体小表有多小,由参数hive.mapjoin.smalltable.filesize来决定,该参数表示小表的总大小,默认值为25000000字节,即25M。

Hive0.7之前,需要使用hint提示 /*+ mapjoin(table) */才会执行MapJoin,否则执行Common Join,但在0.7版本之后,默认自动会转换Map Join,由参数hive.auto.convert.join来控制,默认为true

MapJoin简单说就是在Map阶段将小表数据从 HDFS 上读取到内存中的哈希表中,读完后将内存中的哈希表序列化为哈希表文件,在下一阶段,当 MapReduce 任务启动时,会将这个哈希表文件上传到 Hadoop 分布式缓存中,该缓存会将这些文件发送到每个 Mapper 的本地磁盘上。因此,所有 Mapper 都可以将此持久化的哈希表文件加载回内存,并像之前一样进行 Join。顺序扫描大表完成Join。减少昂贵的shuffle操作及reduce操作

MapJoin分为两个阶段:

  • 通过MapReduce Local Task,将小表读入内存,生成HashTableFiles上传至Distributed Cache中,这里会HashTableFiles进行压缩。
  • MapReduce Job在Map阶段,每个Mapper从Distributed Cache读取HashTableFiles到内存中,顺序扫描大表,在Map阶段直接进行Join,将数据传递给下一个MapReduce任务

19. 在 hive 的 row_number 中 distribute by 和 partition by 的区别

row_number() over( partition by 分组的字段 order by 排序的字段) as rank(rank 可随意定义 表示排序的标识)

row_number() over( distribute by 分组的字段 sort by 排序的字段) as rank(rank 可随意定义 表示排序的标识)

partition by 只能和 order by 组合使用

distribute by 只能和 sort by 使用

20. Hive开发中遇到的问题

SQL优化:where条件在map端执行而不是在reduce端执行

21. 什么时候使用内部表,什么时候使用外部表

hive内部表和外部表的区别

内部表:加载数据到hive所在的hdfs目录,删除时,元数据和数据文件都删除

外部表:不加载数据到hive所在的hdfs目录,删除时,只删除表结构。

这样外部表相对来说更加安全些,数据组织也更加灵活,方便共享源数据

什么时候使用内部表,什么时候使用外部表
  • 每天采集的ng日志和埋点日志,在存储的时候建议使用外部表,因为日志数据是采集程序实时采集进来的,一旦被误删,恢复起来非常麻烦。而且外部表方便数据的共享。
  • 抽取过来的业务数据,其实用外部表或者内部表问题都不大,就算被误删,恢复起来也是很快的,如果需要对数据内容和元数据进行紧凑的管理, 那还是建议使用内部表
  • 在做统计分析时候用到的中间表,结果表可以使用内部表,因为这些数据不需要共享,使用内部表更为合适。并且很多时候结果分区表我们只需要保留最近3天的数据,用外部表的时候删除分区时无法删除数据

22. hive 都有哪些函数,你平常工作中用到哪些

Hive常用函数

23. 手写 sql,连续活跃用户

  1. 先把数据按照用户ID分组,根据登录日期排序
select user_id
       ,login_date
       ,row_number() over(partition by user_id order by login_date asc) as rn 
from
       wedw_dw.t_login_info
+----------+-------------+-----+--+
| user_id  | login_date  | rn  |
+----------+-------------+-----+--+
| user01   | 2018-02-28  | 1   |
| user01   | 2018-03-01  | 2   |
| user01   | 2018-03-02  | 3   |
| user01   | 2018-03-04  | 4   |
| user01   | 2018-03-05  | 5   |
| user01   | 2018-03-06  | 6   |
| user01   | 2018-03-07  | 7   |
| user02   | 2018-03-01  | 1   |
| user02   | 2018-03-02  | 2   |
| user02   | 2018-03-03  | 3   |
| user02   | 2018-03-06  | 4   |
+----------+-------------+-----+--+
  1. 用登录用户日期减去排序数字rn,得到的差值日期如果是相等的,则说明这几天是连续的
select
     t1.user_id
    ,t1.login_date
    ,date_sub(t1.login_date,rn) as date_diff
from 
(
  select
         user_id
         ,login_date
         ,row_number() over(partition by user_id order by login_date asc) as rn 
  from
      	wedw_dw.t_login_info
) t1;
+----------+-------------+-------------+--+
| user_id  | login_date  |  date_diff  |
+----------+-------------+-------------+--+
| user01   | 2018-02-28  | 2018-02-27  |
| user01   | 2018-03-01  | 2018-02-27  |
| user01   | 2018-03-02  | 2018-02-27  |
| user01   | 2018-03-04  | 2018-02-28  |
| user01   | 2018-03-05  | 2018-02-28  |
| user01   | 2018-03-06  | 2018-02-28  |
| user01   | 2018-03-07  | 2018-02-28  |
| user02   | 2018-03-01  | 2018-02-28  |
| user02   | 2018-03-02  | 2018-02-28  |
| user02   | 2018-03-03  | 2018-02-28  |
| user02   | 2018-03-06  | 2018-03-02  |
+----------+-------------+-------------+--+
  1. 根据user_id和日期差date_diff分组,最小登录日期即为连续登录的开始日期start_date,最大登录日期即为结束日期end_date,登录次数即为分组后的count(1)
select
       t2.user_id         as user_id
      ,count(1)           as times
      ,min(t2.login_date) as start_date
      ,max(t2.login_date) as end_date
from
(
    select
           t1.user_id
          ,t1.login_date
          ,date_sub(t1.login_date,rn) as date_diff
    from
    (
        select
               user_id
              ,login_date
              ,row_number() over(partition by user_id order by login_date asc) as rn 
         from
              wedw_dw.t_login_info
    ) t1
) t2
group by 
 t2.user_id
,t2.date_diff
having times >= 3
;
+----------+--------+-------------+-------------+--+
| user_id  | times  | start_date  |  end_date   |
+----------+--------+-------------+-------------+--+
| user01   | 3      | 2018-02-28   | 2018-03-02  |
| user01    | 4      | 2018-03-04  | 2018-03-07  |
| user02   | 3      | 2018-03-01   | 2018-03-03  |
+----------+--------+-------------+-------------+--+

24. left semi join 和 left join 区别

left join:两个表的全部字段均会展示出来

left semi join:仅展示A表字段,因为left semi join只传递表的join key给Map阶段

in:效果与left semi join一致

inner join:仅展示A表数据,但不会对结果去重

25. group by 为什么要排序

26. 说说印象最深的一次优化场景,hive 常见的优化思路

27. Hive的执行引擎,Spark和MR的区别

引擎是MR,基于磁盘进行计算,速度比较慢

引擎是Spark,基于内存进行计算,速度比较快

对于超大数据量的话,HiveOnSpark可能会有内存溢出的情况

28. Hive的Join底层MR是如何实现的

Hive的join底层mapreduce是如何实现的

29. 建好了外部表,用什么语句把数据文件加载到表中

  • 从本地导入:load data lacal inpath /home/data.log into table ods.test
  • 从HDFS导入:load data inpath /user/hive/warehouse/a.txt into ods.test

30. Hive的执行流程

  1. 用户提交查询等任务给Driver
  2. 编译器获得该用户的任务Plan
  3. 编译器Compiler根据用户任务去MetStore中获取需要的Hive的元数据信息
  4. 编译器Compiler得到元数据信息,对任务进行编译,先将HiveQL转换为抽象语法树,然后将抽象语法树转换成查询块,将查询块转化为逻辑的查询计划,重写逻辑查询计划,将逻辑计划转化为物理的计划(MapReduce),最后选择最佳的策略
  5. 将最终的计划提交给Driver
  6. Driver将计划Plan转交给ExecutionEngine去执行,获取元数据信息,提交给JobTracker或者SourceManager执行该任务,任务会直接读区HDFS中文件进行相应的操作
  7. 获取执行的结果
  8. 取得并返回执行结果

31. SQL语句的执行顺序

FROM—Where—Group By—Having—Select—Order By—Limit

32. On和Where的区别

  • 不考虑where条件下,left join 会把左表所有数据查询出来,on及其后面的条件仅仅会影响右表的数据(符合就显示,不符合全部为null)

  • 在匹配阶段,where子句的条件都不会被使用,仅在匹配阶段完成以后,where子句条件才会被使用,它将从匹配阶段产生的数据中检索过滤

  • 所以左连接关注的是左边的主表数据,不应该把on后面的从表中的条件加到where后,这样会影响原有主表中的数据

  • where后面:是先连接然生成临时查询结果,然后再筛选

    on后面:先根据条件过滤筛选,再连接生成临时查询结果

有谓词下推的情况下查询结果没有区别。没有谓词下推的情况下,在执行计划中会对on的条件内的子查询先进行过滤,最后再将结果根据where条件过滤,对于这种情况,应当尽可能的将子查询以及管理表的数据量减少以提升查询性能,避免笛卡尔积等情况。

从功能上来区分,on的结果是临时表,where是对on的临时结果做过滤

33. Hive中导入数据的4中方式

  • 从本地导入:load data local inpath /home/data.log into table ods.test
  • 从HDFS导入:load data inpath /user/hive/warehouse/a.txt into ods.test
  • 查询导入:create table tmp_test as select * from ods.test
  • 查询结果导入:insert into table tmp.test select * from ods.test

34. 下述SQL在Hive、SparkSql两种引擎中,执行流程分别是什么,区别是什么

Hive on Mapreduce

Hive的原理大家可以参考这篇大数据时代的技术hive:hive介绍,实际的一些操作可以看这篇笔记:新手的Hive指南,至于还有兴趣看Hive优化方法可以看看我总结的这篇Hive性能优化上的一些总结

Hive on Mapreduce执行流程

这里写图片描述

执行流程详细解析

  • Step 1:UI(user interface) 调用 executeQuery 接口,发送 HQL 查询语句给 Driver
  • Step 2:Driver 为查询语句创建会话句柄,并将查询语句发送给 Compiler, 等待其进行语句解析并生成执行计划
  • Step 3 and 4:Compiler 从 metastore 获取相关的元数据
  • Step 5:元数据用于对查询树中的表达式进行类型检查,以及基于查询谓词调整分区,生成计划
  • Step 6 (6.1,6.2,6.3):由 Compiler 生成的执行计划是阶段性的 DAG,每个阶段都可能会涉及到 Map/Reduce job、元数据的操作、HDFS 文件的操作,Execution Engine 将各个阶段的 DAG 提交给对应的组件执行。
  • Step 7, 8 and 9:在每个任务(mapper / reducer)中,查询结果会以临时文件的方式存储在 HDFS 中。保存查询结果的临时文件由 Execution Engine 直接从 HDFS 读取,作为从 Driver Fetch API 的返回内容。
Hive on Mapreduce特点
  1. 关系数据库里,表的加载模式是在数据加载时候强制确定的(表的加载模式是指数据库存储数据的文件格式),如果加载数据时候发现加载的数据不符合模式,关系数据库则会拒绝加载数据,这个就叫“写时模式”,写时模式会在数据加载时候对数据模式进行检查校验的操作。**Hive在加载数据时候和关系数据库不同,hive在加载数据时候不会对数据进行检查,也不会更改被加载的数据文件,而检查数据格式的操作是在查询操作时候执行,这种模式叫“读时模式”。**在实际应用中,写时模式在加载数据时候会对列进行索引,对数据进行压缩,因此加载数据的速度很慢,但是当数据加载好了,我们去查询数据的时候,速度很快。但是当我们的数据是非结构化,存储模式也是未知时候,关系数据操作这种场景就麻烦多了,这时候hive就会发挥它的优势。
  2. 关系数据库一个重要的特点是可以对某一行或某些行的数据进行更新、删除操作,hive不支持对某个具体行的操作,hive对数据的操作只支持覆盖原数据和追加数据。Hive也不支持事务和索引。更新、事务和索引都是关系数据库的特征,这些hive都不支持,也不打算支持,原因是hive的设计是海量数据进行处理,全数据的扫描时常态,针对某些具体数据进行操作的效率是很差的,对于更新操作,hive是通过查询将原表的数据进行转化最后存储在新表里,这和传统数据库的更新操作有很大不同。
  3. Hive也可以在hadoop做实时查询上做一份自己的贡献,那就是和hbase集成,hbase可以进行快速查询,但是hbase不支持类SQL的语句,那么此时hive可以给hbase提供sql语法解析的外壳,可以用类sql语句操作hbase数据库。
  4. Hive可以认为是MapReduce的一个封装、包装。Hive的意义就是在业务分析中将用户容易编写、会写的Sql语言转换为复杂难写的MapReduce程序,从而大大降低了Hadoop学习的门槛,让更多的用户可以利用Hadoop进行数据挖掘分析。

与传统数据库之间对比—From:Hive和传统数据库进行比较

比较项 SQL HiveQL
ANSI SQL 支持 不完全支持
更新 UPDATE\INSERT\DELETE insert OVERWRITE\INTO TABLE
事务 支持 不支持
模式 写模式 读模式
数据保存 块设备、本地文件系统 HDFS
延时
多表插入 不支持 支持
子查询 完全支持 只能用在From子句中
视图 Updatable Read-only
可扩展性
数据规模

SparkSQL
SparkSQL简介

SparkSQL的前身是Shark,给熟悉RDBMS但又不理解MapReduce的技术人员提供快速上手的工具,hive应运而生,它是当时唯一运行在Hadoop上的SQL-on-hadoop工具。但是MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,降低的运行效率,为了提高SQL-on-Hadoop的效率,Shark应运而生,但又因为Shark对于Hive的太多依赖(如采用Hive的语法解析器、查询优化器等等),2014年spark团队停止对Shark的开发,将所有资源放SparkSQL项目上

img

其中SparkSQL作为Spark生态的一员继续发展,而不再受限于Hive,只是兼容Hive;而Hive on Spark是一个Hive的发展计划,该计划将Spark作为Hive的底层引擎之一,也就是说,Hive将不再受限于一个引擎,可以采用Map-Reduce、Tez、Spark等引擎。

  • SparkSQL的两个组件
  1. SQLContext:Spark SQL提供SQLContext封装Spark中的所有关系型功能。可以用之前的示例中的现有SparkContext创建SQLContext。
  2. DataFrame:DataFrame是一个分布式的,按照命名列的形式组织的数据集合。DataFrame基于R语言中的data frame概念,与关系型数据库中的数据库表类似。通过调用将DataFrame的内容作为行RDD(RDD of Rows)返回的rdd方法,可以将DataFrame转换成RDD。可以通过如下数据源创建DataFrame:已有的RDD、结构化数据文件、JSON数据集、Hive表、外部数据库。
SparkSQL运行架构

类似于关系型数据库,SparkSQL也是语句也是由Projection(a1,a2,a3)、Data Source(tableA)、Filter(condition)组成,分别对应sql查询过程中的Result、Data Source、Operation,也就是说SQL语句按Operation–>Data Source–>Result的次序来描述的。

img

当执行SparkSQL语句的顺序

  1. 对读入的SQL语句进行解析(Parse),分辨出SQL语句中哪些词是关键词(如SELECT、FROM、WHERE),哪些是表达式、哪些是Projection、哪些是Data Source等,从而判断SQL语句是否规范;
    • Projection:简单说就是select选择的列的集合,参考:SQL Projection
  2. 将SQL语句和数据库的数据字典(列、表、视图等等)进行绑定(Bind),如果相关的Projection、Data Source等都是存在的话,就表示这个SQL语句是可以执行的;
  3. 一般的数据库会提供几个执行计划,这些计划一般都有运行统计数据,数据库会在这些计划中选择一个最优计划(Optimize);
  4. 计划执行(Execute),按Operation–>Data Source–>Result的次序来进行的,在执行过程有时候甚至不需要读取物理表就可以返回结果,比如重新运行刚运行过的SQL语句,可能直接从数据库的缓冲池中获取返回结果。

解析方式如下

在这里插入图片描述

From:https://www.linkedin.com/pulse/hive-spark-vs-sparksql-sumit-vyas

Hive on Spark

hive on Spark是由Cloudera发起,由Intel、MapR等公司共同参与的开源项目,其目的是把Spark作为Hive的一个计算引擎,将Hive的查询作为Spark的任务提交到Spark集群上进行计算。通过该项目,可以提高Hive查询的性能,同时为已经部署了Hive或者Spark的用户提供了更加灵活的选择,从而进一步提高Hive和Spark的普及率。

在hive中使用以下语句开启;当然引擎还可以使用tez,一样的方式
hive> set hive.execution.engine=spark;

解析方式如下:
https://www.linkedin.com/pulse/hive-spark-vs-sparksql-sumit-vyas

Hive on Spark与SparkSql的区别

hive on spark大体与SparkSQL结构类似,只是SQL解析器不同,但是计算引擎都是spark!(比如sparksql的解析过程中多了很多优化,cbo这类的)敲黑板!这才是重点!

我们来看下,在pyspark中使用Hive on Spark是中怎么样的体验

#初始化Spark SQL
#导入Spark SQL
from pyspark.sql import HiveContext,Row
# 当不能引入Hive依赖时
# from pyspark.sql import SQLContext,Row
# 注意,上面那一点才是关键的,他两来自于同一个包,你们区别能有多大


hiveCtx = HiveContext(sc)	#创建SQL上下文环境
input = hiveCtx.jsonFile(inputFile)	  #基本查询示例
input.registerTempTable("tweets")	#注册输入的SchemaRDD(SchemaRDD在Spark 1.3版本后已经改为DataFrame)
#依据retweetCount(转发计数)选出推文
topTweets = hiveCtx.sql("SELECT text,retweetCount FROM tweets ORDER BY retweetCount LIMIT 10")
12345678910111213

SparkSQL和Hive On Spark都是在Spark上实现SQL的解决方案。Spark早先有Shark项目用来实现SQL层,不过后来推翻重做了,就变成了SparkSQL。这是Spark官方Databricks的项目,Spark项目本身主推的SQL实现。Hive On Spark比SparkSQL稍晚。Hive原本是没有很好支持MapReduce之外的引擎的,而Hive On Tez项目让Hive得以支持和Spark近似的Planning结构(非MapReduce的DAG)。所以在此基础上,Cloudera主导启动了Hive On Spark。这个项目得到了IBM,Intel和MapR的支持(但是没有Databricks)。—From SparkSQL与Hive on Spark的比较


Hive on Mapreduce和SparkSQL使用场景
Hive on Mapreduce场景
  • Hive的出现可以让那些精通SQL技能、但是不熟悉MapReduce 、编程能力较弱与不擅长Java语言的用户能够在HDFS大规模数据集上很方便地利用SQL 语言查询、汇总、分析数据,毕竟精通SQL语言的人要比精通Java语言的多得多
  • Hive适合处理离线非实时数据
SparkSQL场景
  • Spark既可以运行本地local模式,也可以以Standalone、cluster等多种模式运行在Yarn、Mesos上,还可以运行在云端例如EC2。此外,Spark的数据来源非常广泛,可以处理来自HDFS、HBase、 Hive、Cassandra、Tachyon上的各种类型的数据。
  • 实时性要求或者速度要求较高的场所
Hive on Mapreduce和SparkSQL性能对比

具体实验参见:Spark SQL & Spark Hive编程开发, 并和Hive执行效率对比

结论:sparksql和hive on spark时间差不多,但都比hive on mapreduce快很多,官方数据认为spark会被传统mapreduce快10-100倍

Hive vs Hive on Spark vs Sparksql vs RDD

对比试验详见:https://hivevssparksql.wordpress.com/


35. Hive的执行计划(explain)

HIVE提供了EXPLAIN命令来展示一个查询的执行计划,这个执行计划对于我们了解底层原理,hive 调优,排查数据倾斜等很有帮助

语法如下:

EXPLAIN [EXTENDED|CBO|AST|DEPENDENCY|AUTHORIZATION|LOCKS|VECTORIZATION|ANALYZE] query

explain 后面可以跟以下可选参数,注意:这几个可选参数不是 hive 每个版本都支持的

  1. EXTENDED:加上 extended 可以输出有关计划的额外信息。这通常是物理信息,例如文件名。这些额外信息对我们用处不大
  2. CBO:输出由Calcite优化器生成的计划。CBO 从 hive 4.0.0 版本开始支持
  3. AST:输出查询的抽象语法树。AST 在hive 2.1.0 版本删除了,存在bug,转储AST可能会导致OOM错误,将在4.0.0版本修复
  4. DEPENDENCY:dependency在EXPLAIN语句中使用会产生有关计划中输入的额外信息。它显示了输入的各种属性
  5. AUTHORIZATION:显示所有的实体需要被授权执行(如果存在)的查询和授权失败
  6. LOCKS:这对于了解系统将获得哪些锁以运行指定的查询很有用。LOCKS 从 hive 3.2.0 开始支持
  7. VECTORIZATION:将详细信息添加到EXPLAIN输出中,以显示为什么未对Map和Reduce进行矢量化。从 Hive 2.3.0 开始支持
  8. ANALYZE:用实际的行数注释计划。从 Hive 2.2.0 开始支持

在 hive cli 中输入以下命令(hive 2.3.7):

explain select sum(id) from test1;

得到如下结果:

STAGE DEPENDENCIES:	--各个Stage之间的依赖性
  Stage-1 is a root stage							--Stage-1 是根stage,即开始的stage
  Stage-0 depends on stages: Stage-1	--Stage-0 依赖Stage-1,Stage-1 执行完成之后执行Stage-0

STAGE PLANS:	--各个Stage的执行计划
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:	--Map端的执行计划树
          TableScan				--表扫描操作
            alias: test1	--alias: 表名称
            Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE	--Statistics: 表统计信息,包含表中数据条数,数据大小等
            Select Operator	--选取操作
              expressions: id (type: int)		--expressions:需要的字段名称及字段类型
              outputColumnNames: id					--outputColumnNames: 输出的列名称
              Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE --Statistics: 表统计信息,包含表中数据条数,数据大小等
              Group By Operator	--分组聚合操作
                aggregations: sum(id)	-- aggregations: 显示聚合函数信息
                mode: hash						-- mode: 聚合模式,值有hash(随机聚合,即分区方式为hash);partitial(局部集合);final(最终聚合)
                outputColumnNames: _col0	--outputColumnNames: 聚合之后输出列名
                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE --Statistics: 表统计信息,包含分组聚合之后的数据条数,数据大小等
                Reduce Output Operator	--输出到Reduce操作
                  sort order:		--sort order: 值为空 不排序;值为“+” 正序排序;值为“-” 倒序排序;值为“+-”  排序的列为两列,第一列为正序,第二列为倒序
                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                  value expressions: _col0 (type: bigint)
      Reduce Operator Tree:	--Reduce端的执行计划树
        Group By Operator
          aggregations: sum(VALUE._col0)
          mode: mergepartial
          outputColumnNames: _col0
          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
          File Output Operator	--文件输出操作
            compressed: false		--compressed: 是否压缩
            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
            table:	--表信息,包含输出输出文件格式化方式,序列化方式等
                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator	--客户端获取数据操作
      limit: -1	--limit: 值为 -1 表示不限制条数,其他值为限制的条数
      Processor Tree:
        ListSink

一个HIVE查询被转换为一个由一个或多个stage组成的序列(有向无环图DAG)。这些stage可以是MapReduce stage,也可以是负责元数据存储的stage,也可以是负责文件系统的操作(比如移动和重命名)的stage

我们将上述结果拆分看,先从最外层开始,包含两个大的部分:

  1. stage dependencies: 各个stage之间的依赖性
  2. stage plan: 各个stage的执行计划

先看第一部分 stage dependencies ,包含两个 stage,Stage-1 是根stage,说明这是开始的stage,Stage-0 依赖 Stage-1,Stage-1执行完成后执行Stage-0。

再看第二部分 stage plan,里面有一个 Map Reduce,一个MR的执行计划分为两个部分:

  1. Map Operator Tree: MAP端的执行计划树
  2. Reduce Operator Tree: Reduce端的执行计划树

这两个执行计划树里面包含这条sql语句的 operator:

  1. map端第一个操作肯定是加载表,所以就是 TableScan 表扫描操作,常见的属性:

    • alias: 表名称
    • Statistics: 表统计信息,包含表中数据条数,数据大小等
  2. Select Operator: 选取操作,常见的属性 :

    • expressions:需要的字段名称及字段类型
    • outputColumnNames:输出的列名称
    • Statistics:表统计信息,包含表中数据条数,数据大小等
  3. Group By Operator:分组聚合操作,常见的属性:

    • aggregations:显示聚合函数信息
    • mode:聚合模式,值有 hash:随机聚合,就是hash partition;partial:局部聚合;final:最终聚合
    • keys:分组的字段,如果没有分组,则没有此字段
    • outputColumnNames:聚合之后输出列名
    • Statistics: 表统计信息,包含分组聚合之后的数据条数,数据大小等
  4. Reduce Output Operator:输出到reduce操作,常见属性:

    • sort order:值为空 不排序;值为 + 正序排序,值为 - 倒序排序;值为 ± 排序的列为两列,第一列为正序,第二列为倒序
  5. Filter Operator:过滤操作,常见的属性:

    • predicate:过滤条件,如sql语句中的where id>=1,则此处显示(id >= 1)
  6. Map Join Operator:join 操作,常见的属性:

    • condition map:join方式 ,如Inner Join 0 to 1 Left Outer Join0 to 2
    • keys: join 的条件字段
    • outputColumnNames: join 完成之后输出的字段
    • Statistics: join 完成之后生成的数据条数,大小等
  7. File Output Operator:文件输出操作,常见的属性

    • compressed:是否压缩
    • table:表的信息,包含输入输出文件格式化方式,序列化方式等
  8. Fetch Operator 客户端获取数据操作,常见的属性:

    • limit,值为 -1 表示不限制条数,其他值为限制的条数
实践
1. join 语句会过滤 null 的值吗?

现在,我们在hive cli 输入以下查询计划语句

select a.id,b.user_name from test1 a join test2 b on a.id=b.id;

问:上面这条 join 语句会过滤 id 为 null 的值吗

执行下面语句:

explain select a.id,b.user_name from test1 a join test2 b on a.id=b.id;

我们来看结果 (为了适应页面展示,仅截取了部分输出信息):

TableScan
 alias: a
 Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
 Filter Operator
    predicate: id is not null (type: boolean)
    Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
    Select Operator
        expressions: id (type: int)
        outputColumnNames: _col0
        Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
        HashTable Sink Operator
           keys:
             0 _col0 (type: int)
             1 _col0 (type: int)
 ...

从上述结果可以看到 predicate: id is not null 这样一行,说明 join 时会自动过滤掉关联字段为 null 值的情况,但 left join 或 full join 是不会自动过滤的,大家可以自行尝试下。

2. group by 分组语句会进行排序吗?

看下面这条sql

select id,max(user_name) from test1 group by id;

问:group by 分组语句会进行排序吗

直接来看 explain 之后结果 (为了适应页面展示,仅截取了部分输出信息)

 TableScan
    alias: test1
    Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
    Select Operator
        expressions: id (type: int), user_name (type: string)
        outputColumnNames: id, user_name
        Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
        Group By Operator
           aggregations: max(user_name)
           keys: id (type: int)		--按照id进行分组
           mode: hash
           outputColumnNames: _col0, _col1
           Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
           Reduce Output Operator
             key expressions: _col0 (type: int)
             sort order: +	--按照in进行正序排序
             Map-reduce partition columns: _col0 (type: int)
             Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
             value expressions: _col1 (type: string)
 ...

我们看 Group By Operator,里面有 keys: id (type: int) 说明按照 id 进行分组的,再往下看还有 sort order: + ,说明是按照 id 字段进行正序排序的

3. 哪条sql执行效率高呢?

观察两条sql语句

SELECT
    a.id,
    b.user_name
FROM
    test1 a
JOIN test2 b ON a.id = b.id
WHERE
    a.id > 2;
    
SELECT
    a.id,
    b.user_name
FROM
    (SELECT * FROM test1 WHERE id > 2) a
JOIN test2 b ON a.id = b.id;

这两条sql语句输出的结果是一样的,但是哪条sql执行效率高呢
有人说第一条sql执行效率高,因为第二条sql有子查询,子查询会影响性能
有人说第二条sql执行效率高,因为先过滤之后,在进行join时的条数减少了,所以执行效率就高了

到底哪条sql效率高呢,我们直接在sql语句前面加上 explain,看下执行计划不就知道了嘛

在第一条sql语句前加上 explain,得到如下结果

hive (default)> explain select a.id,b.user_name from test1 a join test2 b on a.id=b.id where a.id >2;
OK
Explain
STAGE DEPENDENCIES:
  Stage-4 is a root stage
  Stage-3 depends on stages: Stage-4
  Stage-0 depends on stages: Stage-3

STAGE PLANS:
  Stage: Stage-4
    Map Reduce Local Work
      Alias -> Map Local Tables:
        $hdt$_0:a
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        $hdt$_0:a
          TableScan
            alias: a
            Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: (id > 2) (type: boolean)
              Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
              Select Operator
                expressions: id (type: int)
                outputColumnNames: _col0
                Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
                HashTable Sink Operator
                  keys:
                    0 _col0 (type: int)
                    1 _col0 (type: int)

  Stage: Stage-3
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: b
            Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: (id > 2) (type: boolean)
              Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
              Select Operator
                expressions: id (type: int), user_name (type: string)
                outputColumnNames: _col0, _col1
                Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
                Map Join Operator
                  condition map:
                       Inner Join 0 to 1
                  keys:
                    0 _col0 (type: int)
                    1 _col0 (type: int)
                  outputColumnNames: _col0, _col2
                  Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                  Select Operator
                    expressions: _col0 (type: int), _col2 (type: string)
                    outputColumnNames: _col0, _col1
                    Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                    File Output Operator
                      compressed: false
                      Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                      table:
                          input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                          output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      Local Work:
        Map Reduce Local Work

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

在第二条sql语句前加上 explain,得到如下结果

hive (default)> explain select a.id,b.user_name from(select * from  test1 where id>2 ) a join test2 b on a.id=b.id;
OK
Explain
STAGE DEPENDENCIES:
  Stage-4 is a root stage
  Stage-3 depends on stages: Stage-4
  Stage-0 depends on stages: Stage-3

STAGE PLANS:
  Stage: Stage-4
    Map Reduce Local Work
      Alias -> Map Local Tables:
        $hdt$_0:test1
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        $hdt$_0:test1
          TableScan
            alias: test1
            Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: (id > 2) (type: boolean)
              Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
              Select Operator
                expressions: id (type: int)
                outputColumnNames: _col0
                Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
                HashTable Sink Operator
                  keys:
                    0 _col0 (type: int)
                    1 _col0 (type: int)

  Stage: Stage-3
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: b
            Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: (id > 2) (type: boolean)
              Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
              Select Operator
                expressions: id (type: int), user_name (type: string)
                outputColumnNames: _col0, _col1
                Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
                Map Join Operator
                  condition map:
                       Inner Join 0 to 1
                  keys:
                    0 _col0 (type: int)
                    1 _col0 (type: int)
                  outputColumnNames: _col0, _col2
                  Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                  Select Operator
                    expressions: _col0 (type: int), _col2 (type: string)
                    outputColumnNames: _col0, _col1
                    Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                    File Output Operator
                      compressed: false
                      Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                      table:
                          input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                          output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      Local Work:
        Map Reduce Local Work

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

大家有什么发现,除了表别名不一样,其他的执行计划完全一样,都是先进行 where 条件过滤,在进行 join 条件关联。说明 hive 底层会自动帮我们进行优化,所以这两条sql语句执行效率是一样的

Map Join执行计划

执行计划

在 Map 操作树我们可以看到 Map Side Join Operator 关键字,就说明 join 是 mapjoin

hive> explain select a.* from passwords a,passwords3 b where a.col0=b.col0;
OK
STAGE DEPENDENCIES:
 Stage-4 is a root stage
 Stage-3 depends on stages: Stage-4
 Stage-0 is a root stage
STAGE PLANS:
 Stage: Stage-4
   Map Reduce Local Work
     Alias -> Map Local Tables:
       b
         Fetch Operator
           limit: -1
     Alias -> Map Local Operator Tree:
       b
         TableScan
           alias: b
           Statistics: Num rows: 1 Data size: 31 Basic stats: COMPLETE Column stats: NONE
           HashTable Sink Operator
             condition expressions:
               0 {col0} {col1} {col2} {col3} {col4} {col5} {col6}
               1 {col0}
             keys:
               0 col0 (type: string)
               1 col0 (type: string)
 Stage: Stage-3
   Map Reduce
     Map Operator Tree:
         TableScan
           alias: a
           Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE
           Map Join Operator
             condition map:
                  Inner Join 0 to 1
             condition expressions:
               0 {col0} {col1} {col2} {col3} {col4} {col5} {col6}
               1 {col0}
keys:
               0 col0 (type: string)
               1 col0 (type: string)
             outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col9
             Statistics: Num rows: 10960295 Data size: 524940416 Basic stats: COMPLETE Column stats: NONE
             Filter Operator
               predicate: (_col0 = _col9) (type: boolean)
               Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE
               Select Operator
                 expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string)
                 outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
                 Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE
                 File Output Operator
                   compressed: false
                   Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE
                   table:
                       input format: org.apache.hadoop.mapred.TextInputFormat
                       output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
     Local Work:
       Map Reduce Local Work
 Stage: Stage-0
   Fetch Operator
     limit: -1

上述执行计划中:

  1. stage5 有一个Map Local TablesHashTable Sink Operator
  2. stage3 有一个Map Join Operator

两个stage结合起来完成了mapjoin 这样一个过程

总结:Hive对SQL语句性能问题排查的方式

  1. 使用explain查看执行计划;
  2. 查看YARN提供的日志

36. Hive SQL优化

37. 小文件过多

1. 使用Hive自带的contatenate命令,自动合并小文件

使用方法:

--	对于非分区表
alter table A concatenate;

--	对于分区表
alter table B partition(day=20201224) concatenate;

举例:

--	向 A 表中插入数据
hive (default)> insert into table A values (1,'aa',67),(2,'bb',87);
hive (default)> insert into table A values (3,'cc',67),(4,'dd',87);
hive (default)> insert into table A values (5,'ee',67),(6,'ff',87);

--	执行以上三条语句,则A表下就会有三个小文件,在hive命令行执行如下语句
--	查看A表下文件数量
hive (default)> dfs -ls /user/hive/warehouse/A;
Found 3 items
-rwxr-xr-x   3 root supergroup        378 2020-12-24 14:46 /user/hive/warehouse/A/000000_0
-rwxr-xr-x   3 root supergroup        378 2020-12-24 14:47 /user/hive/warehouse/A/000000_0_copy_1
-rwxr-xr-x   3 root supergroup        378 2020-12-24 14:48 /user/hive/warehouse/A/000000_0_copy_2

--	可以看到有三个小文件,然后使用 concatenate 进行合并
hive (default)> alter table A concatenate;

--	再次查看A表下文件数量
hive (default)> dfs -ls /user/hive/warehouse/A;
Found 1 items
-rwxr-xr-x   3 root supergroup        778 2020-12-24 14:59 /user/hive/warehouse/A/000000_0

--	已合并成一个文件

注意:
1、concatenate 命令只支持 RCFILE 和 ORC 文件类型。
2、使用concatenate命令合并小文件时不能指定合并后的文件数量,但可以多次执行该命令。
3、当多次使用concatenate后文件数量不在变化,这个跟参数 mapreduce.input.fileinputformat.split.minsize=256mb 的设置有关,可设定每个文件的最小size

2. 调整参数减少Map数量
  • 设置map输入合并小文件的相关参数
--	执行Map前进行小文件合并
--	CombineHiveInputFormat底层是 Hadoop的 CombineFileInputFormat 方法
--	此方法是在mapper中将多个文件合成一个split作为输入
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- 默认

--	每个Map最大输入大小(这个值决定了合并后文件的数量)
set mapred.max.split.size=256000000;   -- 256M

--	一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并)
set mapred.min.split.size.per.node=100000000;  -- 100M

--	一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并)
set mapred.min.split.size.per.rack=100000000;  -- 100M
  • 设置map输出和reduce输出进行合并的相关参数:
--	设置map端输出进行合并,默认为true
set hive.merge.mapfiles = true;

--	设置reduce端输出进行合并,默认为false
set hive.merge.mapredfiles = true;

--	设置合并文件的大小
set hive.merge.size.per.task = 256*1000*1000;   -- 256M

--	当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge
set hive.merge.smallfiles.avgsize=16000000;   -- 16M 
  • 启用压缩
--	 hive的查询结果输出是否进行压缩
set hive.exec.compress.output=true;

--	 MapReduce Job的结果输出是否使用压缩
set mapreduce.output.fileoutputformat.compress=true;
3. 减少Reduce的数量
--	reduce 的个数决定了输出的文件的个数,所以可以调整reduce的个数控制hive表的文件数量,
--	hive中的分区函数 distribute by 正好是控制MR中partition分区的,
--	然后通过设置reduce的数量,结合分区函数让数据均衡的进入每个reduce即可。

--	设置reduce的数量有两种方式,第一种是直接设置reduce个数
set mapreduce.job.reduces=10;

--	第二种是设置每个reduce的大小,Hive会根据数据总大小猜测确定一个reduce个数
set hive.exec.reducers.bytes.per.reducer=5120000000; -- 默认是1G,设置为5G

--	执行以下语句,将数据均衡的分配到reduce中
set mapreduce.job.reduces=10;
insert overwrite table A partition(dt)
select * from B
distribute by rand();

解释:如设置reduce数量为10,则使用 rand(), 随机生成一个数 x % 10 ,
这样数据就会随机进入 reduce 中,防止出现有的文件过大或过小
4. 使用hadoop的archive将小文件归档

Hadoop Archive简称HAR,是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样在减少namenode内存使用的同时,仍然允许对文件进行透明的访问

--	用来控制归档是否可用
set hive.archive.enabled=true;
--	通知Hive在创建归档时是否可以设置父目录
set hive.archive.har.parentdir.settable=true;
--	控制需要归档文件的大小
set har.partfile.size=1099511627776;

--	使用以下命令进行归档
ALTER TABLE A ARCHIVE PARTITION(dt='2020-12-24', hr='12');

--	对已归档的分区恢复为原文件
ALTER TABLE A UNARCHIVE PARTITION(dt='2020-12-24', hr='12');

注意:
归档的分区可以查看不能 insert overwrite,必须先 unarchive小文件过多

1. 使用Hive自带的contatenate命令,自动合并小文件

使用方法:

--	对于非分区表
alter table A concatenate;

--	对于分区表
alter table B partition(day=20201224) concatenate;

举例:

--	向 A 表中插入数据
hive (default)> insert into table A values (1,'aa',67),(2,'bb',87);
hive (default)> insert into table A values (3,'cc',67),(4,'dd',87);
hive (default)> insert into table A values (5,'ee',67),(6,'ff',87);

--	执行以上三条语句,则A表下就会有三个小文件,在hive命令行执行如下语句
--	查看A表下文件数量
hive (default)> dfs -ls /user/hive/warehouse/A;
Found 3 items
-rwxr-xr-x   3 root supergroup        378 2020-12-24 14:46 /user/hive/warehouse/A/000000_0
-rwxr-xr-x   3 root supergroup        378 2020-12-24 14:47 /user/hive/warehouse/A/000000_0_copy_1
-rwxr-xr-x   3 root supergroup        378 2020-12-24 14:48 /user/hive/warehouse/A/000000_0_copy_2

--	可以看到有三个小文件,然后使用 concatenate 进行合并
hive (default)> alter table A concatenate;

--	再次查看A表下文件数量
hive (default)> dfs -ls /user/hive/warehouse/A;
Found 1 items
-rwxr-xr-x   3 root supergroup        778 2020-12-24 14:59 /user/hive/warehouse/A/000000_0

--	已合并成一个文件

注意:
1、concatenate 命令只支持 RCFILE 和 ORC 文件类型。
2、使用concatenate命令合并小文件时不能指定合并后的文件数量,但可以多次执行该命令。
3、当多次使用concatenate后文件数量不在变化,这个跟参数 mapreduce.input.fileinputformat.split.minsize=256mb 的设置有关,可设定每个文件的最小size

2. 调整参数减少Map数量
  • 设置map输入合并小文件的相关参数
--	执行Map前进行小文件合并
--	CombineHiveInputFormat底层是 Hadoop的 CombineFileInputFormat 方法
--	此方法是在mapper中将多个文件合成一个split作为输入
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- 默认

--	每个Map最大输入大小(这个值决定了合并后文件的数量)
set mapred.max.split.size=256000000;   -- 256M

--	一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并)
set mapred.min.split.size.per.node=100000000;  -- 100M

--	一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并)
set mapred.min.split.size.per.rack=100000000;  -- 100M
  • 设置map输出和reduce输出进行合并的相关参数:
--	设置map端输出进行合并,默认为true
set hive.merge.mapfiles = true;

--	设置reduce端输出进行合并,默认为false
set hive.merge.mapredfiles = true;

--	设置合并文件的大小
set hive.merge.size.per.task = 256*1000*1000;   -- 256M

--	当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge
set hive.merge.smallfiles.avgsize=16000000;   -- 16M 
  • 启用压缩
--	 hive的查询结果输出是否进行压缩
set hive.exec.compress.output=true;

--	 MapReduce Job的结果输出是否使用压缩
set mapreduce.output.fileoutputformat.compress=true;
3. 减少Reduce的数量
--	reduce 的个数决定了输出的文件的个数,所以可以调整reduce的个数控制hive表的文件数量,
--	hive中的分区函数 distribute by 正好是控制MR中partition分区的,
--	然后通过设置reduce的数量,结合分区函数让数据均衡的进入每个reduce即可。

--	设置reduce的数量有两种方式,第一种是直接设置reduce个数
set mapreduce.job.reduces=10;

--	第二种是设置每个reduce的大小,Hive会根据数据总大小猜测确定一个reduce个数
set hive.exec.reducers.bytes.per.reducer=5120000000; -- 默认是1G,设置为5G

--	执行以下语句,将数据均衡的分配到reduce中
set mapreduce.job.reduces=10;
insert overwrite table A partition(dt)
select * from B
distribute by rand();

解释:如设置reduce数量为10,则使用 rand(), 随机生成一个数 x % 10 ,
这样数据就会随机进入 reduce 中,防止出现有的文件过大或过小
4. 使用hadoop的archive将小文件归档

Hadoop Archive简称HAR,是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样在减少namenode内存使用的同时,仍然允许对文件进行透明的访问

--	用来控制归档是否可用
set hive.archive.enabled=true;
--	通知Hive在创建归档时是否可以设置父目录
set hive.archive.har.parentdir.settable=true;
--	控制需要归档文件的大小
set har.partfile.size=1099511627776;

--	使用以下命令进行归档
ALTER TABLE A ARCHIVE PARTITION(dt='2020-12-24', hr='12');

--	对已归档的分区恢复为原文件
ALTER TABLE A UNARCHIVE PARTITION(dt='2020-12-24', hr='12');

注意:
归档的分区可以查看不能 insert overwrite,必须先 unarchive

Hive/Hadoop高频面试点集合

1、Hive的两张表关联,使用MapReduce怎么实现?

如果其中有一张表为小表,直接使用map端join的方式(map端加载小表)进行聚合。

如果两张都是大表,那么采用联合key,联合key的第一个组成部分是join on中的公共字段,第二部分是一个flag,0代表表A,1代表表B,由此让Reduce区分客户信息和订单信息;在Mapper中同时处理两张表的信息,将join on公共字段相同的数据划分到同一个分区中,进而传递到一个Reduce中,然后在Reduce中实现聚合。

2、请谈一下Hive的特点,Hive和RDBMS有什么异同?

hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供完整的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析,但是Hive不支持实时查询。

Hive与关系型数据库的区别:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-L3FsV193-1688880016422)(data:image/gif;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVQImWNgYGBgAAAABQABh6FO1AAAAABJRU5ErkJggg==)]

3、请说明hive中 Sort By,Order By,Cluster By,Distrbute By各代表什么意思?

Order by:会对输入做全局排序,因此只有一个reducer(多个reducer无法保证全局有序)。只有一个reducer,会导致当输入规模较大时,需要较长的计算时间。

Sort by:不是全局排序,其在数据进入reducer前完成排序。1

Distribute by:按照指定的字段对数据进行划分输出到不同的reduce中。

Cluster by:除了具有 distribute by 的功能外还兼具 sort by 的功能。

4、写出Hive中split、coalesce及collect_list函数的用法(可举例)?

split将字符串转化为数组,即:split(‘a,b,c,d’ , ‘,’) ==> [“a”,“b”,“c”,“d”]。

coalesce(T v1, T v2, …) 返回参数中的第一个非空值;如果所有值都为 NULL,那么返回NULL。

collect_list列出该字段所有的值,不去重 => select collect_list(id) from table。

5、 Hive有哪些方式保存元数据,各有哪些特点?

Hive支持三种不同的元存储服务器,分别为:内嵌式元存储服务器、本地元存储服务器、远程元存储服务器,每种存储方式使用不同的配置参数。

内嵌式元存储主要用于单元测试,在该模式下每次只有一个进程可以连接到元存储,Derby是内嵌式元存储的默认数据库。

在本地模式下,每个Hive客户端都会打开到数据存储的连接并在该连接上请求SQL查询。

在远程模式下,所有的Hive客户端都将打开一个到元数据服务器的连接,该服务器依次查询元数据,元数据服务器和客户端之间使用Thrift协议通信。

6、Hive内部表和外部表的区别?

创建表时:创建内部表时,会将数据移动到数据仓库指向的路径;若创建外部表,仅记录数据所在的路径,不对数据的位置做任何改变。

删除表时:在删除表的时候,内部表的元数据和数据会被一起删除, 而外部表只删除元数据,不删除数据。这样外部表相对来说更加安全些,数据组织也更加灵活,方便共享源数据。

7、Hive的函数:UDF、UDAF、UDTF的区别?

UDF:单行进入,单行输出

UDAF:多行进入,单行输出

UDTF:单行输入,多行输出

8、所有的Hive任务都会有MapReduce的执行吗?

不是,从Hive0.10.0版本开始,对于简单的不需要聚合的类似SELECT from

LIMIT n语句,不需要起MapReduce job,直接通过Fetch task获取数据。

9、说说对Hive桶表的理解?

桶表是对数据某个字段进行哈希取值,然后放到不同文件中存储。

数据加载到桶表时,会对字段取hash值,然后与桶的数量取模。把数据放到对应的文件中。物理上,每个桶就是表(或分区)目录里的一个文件,一个作业产生的桶(输出文件)和reduce任务个数相同。

桶表专门用于抽样查询,是很专业性的,不是日常用来存储数据的表,需要抽样查询时,才创建和使用桶表。

10、Hive底层与数据库交互原理?

Hive 的查询功能是由 HDFS 和 MapReduce结合起来实现的,对于大规模数据查询还是不建议在 hive 中,因为过大数据量会造成查询十分缓慢。Hive 与 MySQL的关系:只是借用 MySQL来存储 hive 中的表的元数据信息,称为 metastore(元数据信息)。

11、Hive本地模式

大多数的Hadoop Job是需要Hadoop提供的完整的可扩展性来处理大数据集的。不过,有时Hive的输入数据量是非常小的。在这种情况下,为查询触发执行任务时消耗可能会比实际job的执行时间要多的多。对于大多数这种情况,Hive可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。

用户可以通过设置hive.exec.mode.local.auto的值为true,来让Hive在适当的时候自动启动这个优化。

12、Hive 中的压缩格式TextFile、SequenceFile、RCfile 、ORCfile各有什么区别?

1、TextFile

默认格式,存储方式为行存储,数据不做压缩,磁盘开销大,数据解析开销大。可结合Gzip、Bzip2使用(系统自动检查,执行查询时自动解压),但使用这种方式,压缩后的文件不支持split,Hive不会对数据进行切分,从而无法对数据进行并行操作。并且在反序列化过程中,必须逐个字符判断是不是分隔符和行结束符,因此反序列化开销会比SequenceFile高几十倍。

2、SequenceFile

SequenceFile是Hadoop API提供的一种二进制文件支持,存储方式为行存储,其具有使用方便、可分割、可压缩的特点。

SequenceFile支持三种压缩选择:NONE,RECORD,BLOCK。Record压缩率低,一般建议使用BLOCK压缩。

优势是文件和hadoop api中的MapFile是相互兼容的

3、RCFile

存储方式:数据按行分块,每块按列存储。结合了行存储和列存储的优点:

首先,RCFile 保证同一行的数据位于同一节点,因此元组重构的开销很低;

其次,像列存储一样,RCFile 能够利用列维度的数据压缩,并且能跳过不必要的列读取;

4、ORCFile

存储方式:数据按行分块 每块按照列存储。

压缩快、快速列存取。

效率比rcfile高,是rcfile的改良版本。

小结:

相比TEXTFILE和SEQUENCEFILE,RCFILE由于列式存储方式,数据加载时性能消耗较大,但是具有较好的压缩比和查询响应。

数据仓库的特点是一次写入、多次读取,因此,整体来看,RCFILE相比其余两种格式具有较明显的优势。

13、Hive表关联查询,如何解决数据倾斜的问题?

1)倾斜原因:map输出数据按key Hash的分配到reduce中,由于key分布不均匀、业务数据本身的特、建表时考虑不周、等原因造成的reduce 上的数据量差异过大。(1)key分布不均匀; (2)业务数据本身的特性; (3)建表时考虑不周; (4)某些SQL语句本身就有数据倾斜;

如何避免:对于key为空产生的数据倾斜,可以对其赋予一个随机值。

2)解决方案

(1)参数调节:    hive.map.aggr = true     hive.groupby.skewindata=true

有数据倾斜的时候进行负载均衡,当选项设定位true,生成的查询计划会有两个MR Job。第一个MR Job中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MR Job再根据预处理的数据结果按照Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个Reduce中),最后完成最终的聚合操作。

(2)SQL 语句调节:

① 选用join key分布最均匀的表作为驱动表。做好列裁剪和filter操作,以达到两表做join 的时候,数据量相对变小的效果。  ② 大小表Join:    使用map join让小的维度表(1000 条以下的记录条数)先进内存。在map端完成reduce。  ③ 大表Join大表:    把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null 值关联不上,处理后并不影响最终结果。  ④ count distinct大量相同特殊值:     count distinct 时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。

14、Fetch抓取

Fetch抓取是指,Hive中对某些情况的查询可以不必使用MapReduce计算。例如:SELECT * FROM employees;在这种情况下,Hive可以简单地读取employee对应的存储目录下的文件,然后输出查询结果到控制台。

在hive-default.xml.template文件中hive.fetch.task.conversion默认是more,老版本hive默认是minimal,该属性修改为more以后,在全局查找、字段查找、limit查找等都不走mapreduce。

15、小表、大表Join

将key相对分散,并且数据量小的表放在join的左边,这样可以有效减少内存溢出错误发生的几率;再进一步,可以使用Group让小的维度表(1000条以下的记录条数)先进内存。在map端完成reduce。

实际测试发现:新版的hive已经对小表JOIN大表和大表JOIN小表进行了优化。小表放在左边和右边已经没有明显区别。

16、大表Join大表

1)空KEY过滤   有时join超时是因为某些key对应的数据太多,而相同key对应的数据都会发送到相同的reducer上,从而导致内存不够。此时我们应该仔细分析这些异常的key,很多情况下,这些key对应的数据是异常数据,我们需要在SQL语句中进行过滤。例如key对应的字段为空。2)空key转换   有时虽然某个key为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在join的结果中,此时我们可以表a中key为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的reducer上。

17、Group By

默认情况下,Map阶段同一Key数据分发给一个reduce,当一个key数据过大时就倾斜了。

并不是所有的聚合操作都需要在Reduce端完成,很多聚合操作都可以先在Map端进行部分聚合,最后在Reduce端得出最终结果。1)开启Map端聚合参数设置     (1)是否在Map端进行聚合,默认为True       hive.map.aggr = true     (2)在Map端进行聚合操作的条目数目       hive.groupby.mapaggr.checkinterval = 100000     (3)有数据倾斜的时候进行负载均衡(默认是false)       hive.groupby.skewindata = true

当选项设定为 true,生成的查询计划会有两个MR Job。第一个MR Job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;

第二个MR Job再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key被分布到同一个Reduce中),最后完成最终的聚合操作。

18、Count(Distinct) 去重统计

数据量小的时候无所谓,数据量大的情况下,由于COUNT DISTINCT操作需要用一个Reduce Task来完成,这一个Reduce需要处理的数据量太大,就会导致整个Job很难完成,一般COUNT DISTINCT使用先GROUP BY再COUNT的方式替换

尽量避免笛卡尔积,join的时候不加on条件,或者无效的on条件,Hive只能使用1个reducer来完成笛卡尔积

20、行列过滤

列处理:在SELECT中,只拿需要的列,如果有,尽量使用分区过滤,少用SELECT *。

行处理:在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在Where后面,那么就会先全表关联,之后再过滤。

21、并行执行

Hive会将一个查询转化成一个或者多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段。或者Hive执行过程中可能需要的其他阶段。默认情况下,Hive一次只会执行一个阶段。不过,某个特定的job可能包含众多的阶段,而这些阶段可能并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个job的执行时间缩短。不过,如果有更多的阶段可以并行执行,那么job可能就越快完成。

通过设置参数hive.exec.parallel值为true,就可以开启并发执行。不过,在共享集群中,需要注意下,如果job中并行阶段增多,那么集群利用率就会增加

Round 2:Hbase

1、HBase读写数据流程:

  • HBase写数据和存数据的过程:

HBase数据的写入过程:

1)Client访问zookeeper,获取元数据存储所在的regionserver

2)拿到对应的表存储的regionserver,通过刚刚获取的地址访问对应的regionserver,

3)去表所在的regionserver进行数据的添加

4)查找对应的region,在region中寻找列族,先向memstore中写入数据

5)当memstore写入的值变多,触发溢写操作(flush),进行文件的溢写,成为一个StoreFile

6)当溢写的文件过多时,会触发文件的合并(Compact)操作,合并有两种方式(major,minor)

多个StoreFile合并成一个StoreFile,同时进行版本合并和数据删除

minor compaction:小范围合并,默认是3-10个文件进行合并,不会删除其他版本的数据。

major compaction:将当前目录下的所有文件全部合并,一般手动触发,会删除其他版本的数据(不同时间戳的)

7)当region中的数据逐渐变大之后,达到某一个阈值,会进行裂变(一个region等分为两个region,并分配到不同的regionserver),原本的Region会下线,新Split出来的两个Region会被HMaster分配到相应的HRegionServer上,使得原先1个Region的压力得以分流到2个Region上。

由此可知HBase只是增加数据,所有的更新和删除操作,都是在Compact阶段做的,所以用户写操作只需要进入到内存即可立即返回,从而保证I/O高性能读写。

  • HBase数据的读取流程:

1)Client访问zookeeper,获取元数据存储所在的regionserver

2)通过刚刚获取的地址访问对应的regionserver,拿到对应的表存储的regionserver

3)去表所在的regionserver进行数据的读取

4)查找对应的region,在region中寻找列族,先找到memstore,找不到去blockcache中寻找,再找不到就进行storefile的遍历

5)找到数据之后会先缓存到blockcache中,再将结果返回

blockcache逐渐满了之后,会采用LRU的淘汰策略。

2、HDFS和HBase各自使用场景
首先一点需要明白:Hbase是基于HDFS来存储的。

HDFS:

1)一次性写入,多次读取。

2)保证数据的一致性。

3)主要是可以部署在许多廉价机器中,通过多副本提高可靠性,提供了容错和恢复机制。

HBase:

1)瞬间写入量很大,数据库不好支撑或需要很高成本支撑的场景。

2)数据需要长久保存,且量会持久增长到比较大的场景。

3)HBase不适用与有 join,多级索引,表关系复杂的数据模型。

4)大数据量(100s TB级数据)且有快速随机访问的需求。如:淘宝的交易历史记录。数据量巨大无容置疑,面向普通用户的请求必然要即时响应。

5)业务场景简单,不需要关系数据库中很多特性(例如交叉列、交叉表,事务,连接等等)。

3、Hbase的存储结构

Hbase 中的每张表都通过行键(rowkey)按照一定的范围被分割成多个子表(HRegion),默认一个HRegion 超过256M 就要被分割成两个,由HRegionServer管理,管理哪些 HRegion 由 Hmaster 分配。 HRegion 存取一个子表时,会创建一个 HRegion 对象,然后对表的每个列族(Column Family)创建一个 store 实例, 每个 store 都会有 0 个或多个 StoreFile 与之对应,每个 StoreFile 都会对应一个HFile,HFile 就是实际的存储文件,一个 HRegion 还拥有一个 MemStore实例。

4、热点现象(数据倾斜)怎么产生的,以及解决方法有哪些

热点现象:
某个小的时段内,对HBase的读写请求集中到极少数的Region上,导致这些region所在的RegionServer处理请求量骤增,负载量明显偏大,而其他的RgionServer明显空闲。
热点现象出现的原因:
HBase中的行是按照rowkey的字典顺序排序的,这种设计优化了scan操作,可以将相关的行以及会被一起读取的行存取在临近位置,便于scan。然而糟糕的rowkey设计是热点的源头。
热点发生在大量的client直接访问集群的一个或极少数个节点(访问可能是读,写或者其他操作)。大量访问会使热点region所在的单个机器超出自身承受能力,引起性能下降甚至region不可用,这也会影响同一个RegionServer上的其他region,由于主机无法服务其他region的请求。
热点现象解决办法:
为了避免写热点,设计rowkey使得不同行在同一个region,但是在更多数据情况下,数据应该被写入集群的多个region,而不是一个。常见的方法有以下这些:

1)加盐:在rowkey的前面增加随机数,使得它和之前的rowkey的开头不同。分配的前缀种类数量应该和你想使用数据分散到不同的region的数量一致。加盐之后的rowkey就会根据随机生成的前缀分散到各个region上,以避免热点。

2)哈希:哈希可以使负载分散到整个集群,但是读却是可以预测的。使用确定的哈希可以让客户端重构完整的rowkey,可以使用get操作准确获取某一个行数据

3)反转:第三种防止热点的方法时反转固定长度或者数字格式的rowkey。这样可以使得rowkey中经常改变的部分(最没有意义的部分)放在前面。这样可以有效的随机rowkey,但是牺牲了rowkey的有序性。反转rowkey的例子以手机号为rowkey,可以将手机号反转后的字符串作为rowkey,这样的就避免了以手机号那样比较固定开头导致热点问题

4)时间戳反转:一个常见的数据处理问题是快速获取数据的最近版本,使用反转的时间戳作为rowkey的一部分对这个问题十分有用,可以用 Long.Max_Value - timestamp 追加到key的末尾,例如[key][reverse_timestamp],[key]的最新值可以通过scan [key]获得[key]的第一条记录,因为HBase中rowkey是有序的,第一条记录是最后录入的数据。

  • 比如需要保存一个用户的操作记录,按照操作时间倒序排序,在设计rowkey的时候,可以这样设计[userId反转] [Long.Max_Value - timestamp],在查询用户的所有操作记录数据的时候,直接指定反转后的userId,startRow是[userId反转][000000000000],stopRow是[userId反转][Long.Max_Value - timestamp]
  • 如果需要查询某段时间的操作记录,startRow是[user反转][Long.Max_Value - 起始时间],stopRow是[userId反转][Long.Max_Value - 结束时间]

5)HBase建表预分区:创建HBase表时,就预先根据可能的RowKey划分出多个region而不是默认的一个,从而可以将后续的读写操作负载均衡到不同的region上,避免热点现象。

11、HBase的 rowkey 设计原则
长度原则:100字节以内,8的倍数最好,可能的情况下越短越好。因为HFile是按照 keyvalue 存储的,过长的rowkey会影响存储效率;其次,过长的rowkey在memstore中较大,影响缓冲效果,降低检索效率。最后,操作系统大多为64位,8的倍数,充分利用操作系统的最佳性能。
散列原则:高位散列,低位时间字段。避免热点问题。
唯一原则:分利用这个排序的特点,将经常读取的数据存储到一块,将最近可能会被访问 的数据放到一块。

12、HBase的列簇设计
原则:在合理范围内能尽量少的减少列簇就尽量减少列簇,因为列簇是共享region的,每个列簇数据相差太大导致查询效率低下。
最优:将所有相关性很强的 key-value 都放在同一个列簇下,这样既能做到查询效率最高,也能保持尽可能少的访问不同的磁盘文件。以用户信息为例,可以将必须的基本信息存放在一个列族,而一些附加的额外信息可以放在另一列族。

13、HBase 中 compact 用途是什么,什么时候触发,分为哪两种,有什么区别
在 hbase 中每当有 memstore 数据 flush 到磁盘之后,就形成一个 storefile,当 storeFile的数量达到一定程度后,就需要将 storefile 文件来进行 compaction 操作。
Compact 的作用:

1)合并文件
2)清除过期,多余版本的数据

16、提高读写数据的效率HBase 中实现了两种 compaction 的方式:

minor and major. 这两种 compaction 方式的 区别是:

1)Minor 操作只用来做部分文件的合并操作以及包括 minVersion=0 并且设置 ttl 的过 期版本清理,不做任何删除数据、多版本数据的清理工作。
2)Major 操作是对 Region 下的 HStore 下的所有 StoreFile 执行合并操作,最终的结果 是整理合并出一个文件。

HBase调优

  1. HBase高可用
  2. 预分区

    每一个 region 维护着 StartRow 与 EndRow,如果加入的数据符合某个 Region 维护的 RowKey 范围,则该数据交给这个 Region 维护。那么依照这个原则,我们可以将数据所要 投放的分区提前大致的规划好,以提高 HBase 性能

    手动设定预分区:

    hbase> create 'user','info','partition1',SPLITS => ['a','c','f','h']
    

    生成16进制序列预分区:

    create 'user2','info','partition2',{NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}
    

    按照文件中设置的规则预分区:

    # 文件位置在HBase安装目录内创建
    touch split.ext
    10
    20
    30
    40
    
    # 执行预分区
    create 'user3','partition3',SPLITS_FILE => 'splits.txt'
    

    使用JavaApi创建预分区:

    //自定义算法,产生一系列Hash散列值存储在二维数组中
    byte[][] splitKeys = 某个散列值函数
    //创建HBaseAdmin实例
    HBaseAdmin hAdmin = new HBaseAdmin(HBaseConfiguration.create());
    //创建HTableDescriptor实例
    HTableDescriptor tableDesc = new HTableDescriptor(tableName);
    //通过HTableDescriptor实例和散列值二维数组创建带有预分区的HBase表
    hAdmin.createTable(tableDesc, splitKeys);
    
  3. 热点预防(优化Rowkey设计)

    一条数据的唯一标识就是 RowKey,那么这条数据存储于哪个分区,取决于 RowKey 处 于哪个一个预分区的区间内,设计 RowKey 的主要目的 ,就是让数据均匀的分布于所有的 region 中,在一定程度上防止数据倾斜

    • **加盐(生成随机数):**在Rowkey的前面增加随机数,散列之后的Rowkey就会根据随机生成的前缀分散到各个Region上,可以有效的避免热点问题 注:生成随机数这种方式,增加了写的吞吐,但是使得读数据更加困难
    • **Hash:**Hash算法包含了MD5等算法,可以直接取Rowkey的MD5值作为Rowkey,或者取MD5值拼接原始Rowkey,组成新的rowkey,由于Rowkey设计不应该太长,所以可以对MD5值进行截取拼接
    • **字符串反转:**时间戳反转、手机号反转
    • **Rowkey长度:**Rowkey长度最长为64kb,建议越短越好,最好不超过16字节。
      • 操作系统大都为64位,内存8字节对齐,控制在16字节,8字节的整数倍利用了操作系统的最佳特性
      • HBase将部分数据加载到内存中,若RowKey过长,内存的有效利用率会下降
    • RowKey唯一:同一个版本同一个RowKey插入HBase会更新之前的数据,与需求不符
  4. 内存优化

    HBase 操作过程中需要大量的内存开销,毕竟 Table 是可以缓存在内存中的,一般会分配整个可用内存的 70%给 HBase 的 Java 堆。但是不建议分配非常大的堆内存,因为 GC 过 程持续太久会导致 RegionServer 处于长期不可用状态,一般 16~48G 内存就可以了,如果因 为框架占用内存过高导致系统内存不足,框架一样会被系统服务拖死

  5. 压缩

    生产系统应使用其ColumnFamily定义进行压缩

  6. Column数控制

    Hbase中的每个列,都归属于某个列簇,列簇是表的schema的一部分(列不是),必须在使用之前定义

    HBase 目前对于两列族或三列族以上的任何项目都不太合适,因此请将模式中的列族数量保持在较低水平

    目前,flushing 和 compactions 是按照每个区域进行的,所以如果一个列族承载大量数据带来的 flushing,即使所携带的数据量很小,也会 flushing 相邻的列族。当许多列族存在时,flushing 和 compactions 相互作用可能会导致一堆不必要的 I/O(要通过更改 flushing 和 compactions 来针对每个列族进行处理)

  7. 开启布隆过滤器

    当我们随机读get数据时,如果采用hbase的块索引机制,hbase会加载很多块文件

    采用布隆过滤器后,它能够准确判断该HFile的所有数据块中是否含有我们查询的数据,从而大大减少不必要的块加载,增加吞吐,降低内存消耗,提高性能

    在读取数据时,hbase会首先在布隆过滤器中查询,根据布隆过滤器的结果,再在MemStore中查询,最后再在对应的HFile中查询

2. hbase 的 rowkey 怎么创建好?列族怎么创建比较好

  • rowkey:一条数据的唯一标识就是 RowKey,那么这条数据存储于哪个分区,取决于 RowKey 处 于哪个一个预分区的区间内,设计 RowKey 的主要目的 ,就是让数据均匀的分布于所有的 region 中,在一定程度上防止数据倾斜

    • **加盐(生成随机数):**在Rowkey的前面增加随机数,散列之后的Rowkey就会根据随机生成的前缀分散到各个Region上,可以有效的避免热点问题 注:生成随机数这种方式,增加了写的吞吐,但是使得读数据更加困难
    • **Hash:**Hash算法包含了MD5等算法,可以直接取Rowkey的MD5值作为Rowkey,或者取MD5值拼接原始Rowkey,组成新的rowkey,由于Rowkey设计不应该太长,所以可以对MD5值进行截取拼接
    • **字符串反转:**时间戳反转、手机号反转
    • **Rowkey长度:**Rowkey长度最长为64kb,建议越短越好,最好不超过16字节。
      • 操作系统大都为64位,内存8字节对齐,控制在16字节,8字节的整数倍利用了操作系统的最佳特性
      • HBase将部分数据加载到内存中,若RowKey过长,内存的有效利用率会下降
    • RowKey唯一:同一个版本同一个RowKey插入HBase会更新之前的数据,与需求不符
  • 列族:一个列族在数据底层是一个文件,所以将经常一起查询的列放到一个列族中,列族尽量少, 减少文件的寻址时间

3. hbase 过滤器实现用途

布隆过滤器是一种多哈希函数映射的快速查找算法(存储结构),可以实现用很小的空间和运算代价,来实现海量数据的存在与否的记录(黑白名单判断)。特点是高效的插入和查询,可以判断出一定不存在和可能存在

相比于传统的 List、Set、Map 等数据结构,它更高效、占用空间更少,但是缺点是其返回的可能存在结果是概率性的,而不是确切的

布隆过滤器是hbase中的高级功能,它能够减少特定访问模式(get/scan)下的查询时间。不过由于这种模式增加了内存和存储的负担,所以被默认为关闭状态

类型 描述
NONE 不使用布隆过滤器
ROW 行键使用布隆过滤器
ROWCOL 列键使用布隆过滤器

当我们随机读get数据时,如果采用hbase的块索引机制,hbase会加载很多块文件

采用布隆过滤器后,它能够准确判断该HFile的所有数据块中是否含有我们查询的数据,从而大大减少不必要的块加载,增加吞吐,降低内存消耗,提高性能

在读取数据时,hbase会首先在布隆过滤器中查询,根据布隆过滤器的结果,再在MemStore中查询,最后再在对应的HFile中查询

4. HBase 宕机如何处理

宕机分为 HMaster 宕机和 HRegisoner 宕机:

  • 如果是 HRegisoner 宕机,HMaster 会将其所管理的 region 重新分布到其他活动的 RegionServer 上,由于数据和日志都持久在 HDFS 中, 该操作不会导致数据丢失。所以数据的一致性和安全性是有保障的
  • 如果是 HMaster 宕机,HMaster 没有单点问题,HBase 中可以启动多个 HMaster,通过 Zookeeper 的 Master Election 机制保证总有一个 Master 运行。即 ZooKeeper 会保证总会有一 个 HMaster 在对外提供服务

5. Hive和HBase的区别是什么

Hive HBase
基于Hadoop的数据仓库 Hadoop的数据库
HiveSql转换为MR执行,速度慢 NoSql数据库,物理表,读写速度快,适合大批量的即时查询

Hive清洗处理后的数据有可能被写入HBase

6. HBase读写流程

写流程
  1. 客户端连接Zookeeper,获取到hbase:meta表所在的regionserver(host:port)
  2. 客户端访问对应Regionserver,获取到meta表,根据请求的信息,得到目标数据应该位于哪个Regionserver中的哪个Region。并将该Table的Region信息缓存在客户端的metaCache
  3. 与目标Regionserver通信,将数据写入WAL(预写入日志)
  4. 将数据写入对应的MemStore,数据会在MemStore中排序
  5. 向客户端发送Ask
  6. 等MemStore的刷写时机到达,将数据刷写入HFile
读流程
  1. 客户端连接Zookeeper,获取到hbase:meta表所在的regionserver(host:port)
  2. 客户端访问对应Regionserver,获取到meta表,根据请求的信息,得到目标数据应该位于哪个Regionserver中的哪个Region。并将该Table的Region信息缓存在客户端的metaCache
  3. 与目标RegionServer通信
  4. 分别在Block Cache(读缓存)、MemStore和StoreFile中查询目标数据,并将查询到的所有数据进行合并
  5. 将从文件中查询到的数据块缓存到BlockCache
  6. 将合并后的结果返回给客户端

7. HBase数据Flush过程

  1. 当MemStore达到阈值(默认128M,老版本64M),将数据刷到硬盘,将内存中的数据删除,同时删除HLog中的历史数据
  2. 并将数据存储到HDFS中
  3. 在HLog中做标记点

9. 数据合并过程

当数据块达到4块,HMaster将数据块加载到本地,进行合并

当合并的数据超过256M,进行拆分,将拆分后的Region分配给不同的HregionServer管理

当RegionServer宕机后,将RegionServer上的HLog拆分,然后分配给不同的RegionServer加载,修改.META

10. HMaster和HRionserver职责

HMaster:

  • 管理用户对表的增删改查操作
  • 记录Region在哪个HregionServer上
  • 在Region Split之后,负责新Region的分配
  • 新机器加入时,管理HRegionServer的负载均衡,调整Region分布
  • 在HRegionServer宕机后,负责失效HRegionServer上的Region迁移

HRegionServer

  • 负责响应用户I/O请求,向HDFS文件系统中读写数据,是HBase中最新核心的模块
  • HRegionServer管理Region

11. HBase列族和Region的关系

HBase有多个RegionServer,每个RegionServer里面有多个Region,一个Region中存放着若干行的行键及所对应的数据,一个列族是一个文件夹,如果经常要搜索整个一条数据,列族越少越好,如果只有一部分的数据需要经常被搜索,那么将经常搜索到的建立一个列族,其他不常搜索的建立列族检索比较快

12. HBase做即席查询,如何设计二级索引

二级索引诞生的缘由:

  • 通过Rowkey查询数据,Rowkey里面会组合固定查询条件,但是需要把多组合查询的字段都拼接在Rowkey中,这是不可能的。
  • 通过Scan全部扫描符合条件的数据,这样的效率是非常低的

所谓二级索引,即原始数据存在HBase,索引存在ES中

主要流程:

  1. 将原始数据存入HBase
  2. 将需要查询的条件字段及RowKey存入ES
  3. 客户端发送请求会根据组合查询条件去ES中查找对应的RowKey
  4. ES返回RowKey给客户端
  5. 客户端根据ES返回的结果(RowKey)查询HBase数据
  6. HBase返回符合条件的数据给客户端

13. HBase中的RegionServer发生故障后的处理方法

HBase检测宕机是通过Zookeeper实现的,正常情况下RegionServer会周期性向ZK发送心跳,一旦发生宕机,心跳就会停止,超过一定时间,ZK就会认为RegionServer宕机离线,并将消息通知给HMaster,HMaster将故障RegionServer管理的Region重新分配到集群中

Round 3:Kafka

1、为什么要使用 kafka?

1)缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。

2)解耦和扩展性:项目开始的时候,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。

3)冗余:可以采用一对多的方式,一个生产者发布消息,可以被多个订阅topic的服务消费到,供多个毫无关联的业务使用。

4)健壮性:消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。

5)异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

2、Kafka消费过的消息如何再消费?

kafka消费消息的offset是定义在zookeeper中的, 如果想重复消费kafka的消息,可以在redis中自己记录offset的checkpoint点(n个),当想重复消费消息时,通过读取redis中的checkpoint点进行zookeeper的offset重设,这样就可以达到重复消费消息的目的了

3、kafka的数据是放在磁盘上还是内存上,为什么速度会快?

kafka使用的是磁盘存储。
速度快是因为:

1)顺序写入:因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是耗时的。所以硬盘 “讨厌”随机I/O, 喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用顺序I/O。

2)Memory Mapped Files(内存映射文件):64位操作系统中一般可以表示20G的数据文件,它的工作原理是直接利用操作系统的Page来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上。

3)Kafka高效文件存储设计: Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。通过索引信息可以快速定位 message和确定response的 大 小。通过index元数据全部映射到memory(内存映射文件), 可以避免segment file的IO磁盘操作。通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。

注:

1)Kafka解决查询效率的手段之一是将数据文件分段,比如有100条Message,它们的offset是从0到99。假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中 小的offset命名。这样在查找指定offset的 Message的时候,用二分查找就可以定位到该Message在哪个段中。

2)为数据文件建 索引数据文件分段 使得可以在一个较小的数据文件中查找对应offset的Message 了,但是这依然需要顺序扫描才能找到对应offset的Message。 为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。

4、Kafka数据怎么保障不丢失?

分三个点说,一个是生产者端,一个消费者端,一个broker端。

1)生产者数据的不丢失

kafka的ack机制:在kafka发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常的能够被收到,其中状态有0,1,-1。
如果是同步模式:
ack设置为0,风险很大,一般不建议设置为0。即使设置为1,也会随着leader宕机丢失数据。所以如果要严格保证生产端数据不丢失,可设置为-1。
如果是异步模式:
也会考虑ack的状态,除此之外,异步模式下的有个buffer,通过buffer来进行控制数据的发送,有两个值来进行控制,时间阈值与消息的数量阈值,如果buffer满了数据还没有发送出去,有个选项是配置是否立即清空buffer。可以设置为-1,永久阻塞,也就数据不再生产。异步模式下,即使设置为-1。也可能因为程序员的不科学操作,操作数据丢失,比如kill -9,但这是特别的例外情况。
注:
ack=0:producer不等待broker同步完成的确认,继续发送下一条(批)信息。
ack=1(默认):producer要等待leader成功收到数据并得到确认,才发送下一条message。
ack=-1:producer得到follwer确认,才发送下一条数据。

2)消费者数据的不丢失

通过offset commit 来保证数据的不丢失,kafka自己记录了每次消费的offset数值,下次继续消费的时候,会接着上次的offset进行消费。
而offset的信息在kafka0.8版本之前保存在zookeeper中,在0.8版本之后保存到topic中,即使消费者在运行过程中挂掉了,再次启动的时候会找到offset的值,找到之前消费消息的位置,接着消费,由于 offset 的信息写入的时候并不是每条消息消费完成后都写入的,所以这种情况有可能会造成重复消费,但是不会丢失消息。
唯一例外的情况是,我们在程序中给原本做不同功能的两个consumer组设置 KafkaSpoutConfig.bulider.setGroupid的时候设置成了一样的groupid,这种情况会导致这两个组共享同一份数据,就会产生组A消费partition1,partition2中的消息,组B消费partition3的消息,这样每个组消费的消息都会丢失,都是不完整的。 为了保证每个组都独享一份消息数据,groupid一定不要重复才行。

3)kafka集群中的broker的数据不丢失

每个broker中的partition我们一般都会设置有replication(副本)的个数,生产者写入的时候首先根据分发策略(有partition按partition,有key按key,都没有轮询)写入到leader中,follower(副本)再跟leader同步数据,这样有了备份,也可以保证消息数据的不丢失。

9、采集数据为什么选择kafka?
采集层 主要可以使用Flume, Kafka等技术。
Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API.
Kafka:Kafka是一个可持久化的分布式的消息队列。 Kafka 是一个非常通用的系统。你可以有许多生产者和很多的消费者共享多个主题Topics。
相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。
所以,Cloudera 建议如果数据被多个系统消费的话,使用kafka;如果数据被设计给Hadoop使用,使用Flume。

10、kafka 重启是否会导致数据丢失?

1)kafka是将数据写到磁盘的,一般数据不会丢失。

2)但是在重启kafka过程中,如果有消费者消费消息,那么kafka如果来不及提交offset,可能会造成数据的不准确(丢失或者重复消费)。

13、kafka 宕机了如何解决?

1)先考虑业务是否受到影响
kafka 宕机了,首先我们考虑的问题应该是所提供的服务是否因为宕机的机器而受到影响,如果服务提供没问题,如果实现做好了集群的容灾机制,那么这块就不用担心了。

2)节点排错与恢复
想要恢复集群的节点,主要的步骤就是通过日志分析来查看节点宕机的原因,从而解决,重新恢复节点。

16、为什么Kafka不支持读写分离?

在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。 Kafka 并不支持主写从读,因为主写从读有 2 个很明显的缺点:

1)数据一致性问题:数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间 窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。

2)延时问题:类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经历 网络→主节点内存→网络→从节点内存 这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历 网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘 这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。

而kafka的主写主读的优点就很多了:

1)可以简化代码的实现逻辑,减少出错的可能;
2)将负载粒度细化均摊,与主写从读相比,不仅负载效能更好,而且对用户可控;
3)没有延时的影响;
4)在副本稳定的情况下,不会出现数据不一致的情况。

9、kafka数据分区和消费者的关系?

每个分区只能由同一个消费组内的一个消费者(consumer)来消费,可以由不同的消费组的消费者来消费,同组的消费者则起到并发的效果。

10、kafka的数据offset读取流程

  1. 连接ZK集群,从ZK中拿到对应topic的partition信息和partition的Leader的相关信息
  2. 连接到对应Leader对应的broker
  3. consumer将⾃自⼰己保存的offset发送给Leader
  4. Leader根据offset等信息定位到segment(索引⽂文件和⽇日志⽂文件)
  5. 根据索引⽂文件中的内容,定位到⽇日志⽂文件中该偏移量量对应的开始位置读取相应⻓长度的数据并返回给consumer

30、kafka内部如何保证顺序,结合外部组件如何保证消费者的顺序?

kafka只能保证partition内是有序的,但是partition间的有序是没办法的。爱奇艺的搜索架构,是从业务上把需要有序的打到同⼀个partition。

13、Kafka消息数据积压,Kafka消费能力不足怎么处理?

1)如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)

2)如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。

14、Kafka单条日志传输大小

kafka对于消息体的大小默认为单条最大值是1M但是在我们应用场景中, 常常会出现一条消息大于1M,如果不对kafka进行配置。则会出现生产者无法将消息推送到kafka或消费者无法去消费kafka里面的数据, 这时我们就要对kafka进行以下配置:server.properties
replica.fetch.max.bytes: 1048576 broker可复制的消息的最大字节数, 默认为1M
message.max.bytes: 1000012 kafka 会接收单个消息size的最大限制, 默认为1M左右
注意:message.max.bytes必须小于等于replica.fetch.max.bytes,否则就会导致replica之间数据同步失败。

Kafka经典面试集锦

1. kafka名次解释和工作方式

  • Producer :消息生产者,就是向 kafka broker 发消息的客户端。
  • Consumer :消息消费者,向 kafka broker 取消息的客户端
  • Topic :可以理解为一个队列,生产者和消费这面向的都是一个Topic
  • Consumer Group (CG):这是Kafka用来实现一个Topic消息的广播(发给所有Consumer)和单播(发给任意一个Consumer)的手段。一个Topic可以有多个CG。Topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但是每个Partion只会把消息发给该CG中的一个Consumer。如果需要实现广播,只要每个Consumer有一个独立的CG就可以了。要实现单播只要所有的Consumer在同一个CG。用CG还可以将Consumer进行自由的分组而不需要多次发送消息到不同的Topic
  • Broker:一台Kafka服务器就是一个Borker。一个集群由多个Borker组成。一个Borker可以容纳多个Topic
  • Partition:为了实现扩展性,一个非常大的Topic可以分布到多个Broker上,一个Topic可以分为多个Partition,每个Partition是一个有序的队列。Partition中的每条消息都会被分配一个有序的ID(Offset)。Kafka只保证按一个Partition中的顺序将消息发给Consumer,不保证一个Topic的整体(多个Partition)的顺序
  • Offset:Kafka的存储文件都是按照offset.kafka来命名,用Offset做名字的好处是方便查找。如想找100的位置,只要找到1000.kafka的文件即可

2. Consumer与Topic的关系

本质上Kafka只支持Topic

每个Group中可以有多个Consumer,每个Consumer属于一个Consumer Group

通常情况下,一个 group 中会包含多个 consumer,这样不仅可以提高 topic 中消息的并发消 费能力,而且还能提高"故障容错"性,如果 group 中的某个 consumer 失效那么其消费的 partitions 将会有其他 consumer 自动接管

对于 Topic 中的一条特定的消息,只会被订阅此 Topic 的每个 group 中的其中一个 consumer 消费,此消息不会发送给一个 group 的多个 consumer

一个Group中所有的Consumer将会交错的消费整个Topic,每个Group中Consumer消息消费相互独立,我们可以认为一个Group是一个“订阅者”

在Kafka中,一个Partition中的消息只会被Group中的一个Consumer消费(同一时刻)

一个Topic中的每个Partitions,只会被一个“订阅者”中的一个Consumer消费,不过一个COnsumer可以同时消费多个Patitions中的消息

Kafka的设计原理决定该,对于一个Topic,同一个Group中不能有多个Parititons个数的Consumer同时消费,否则将意味着某些Consumer将无法得到消息

Kafka只能保证一个Partition中的消息被某个Consumer消费时是顺序的;事实上,从Topic角度来说,当有多个Partition时,消息仍不是全局有序的

3. kafka中生产数据的时候,如何保证写入的容错性

设置发送数据是否需要服务端的反馈,有三个值 0,1,-1

0: producer 不会等待 broker 发送 ack
1: 当 leader 接收到消息之后发送 ack
-1: 当所有的 follower 都同步消息成功后发送

ack request.required.acks=0

4. 如何保证 kafka 消费者消费数据是全局有序的

每个分区内,每条消息都有一个 offset,故只能保证分区内有序

如果要全局有序的,必须保证生产有序,存储有序,消费有序

由于生产可以做集群,存储可以分片,消费可以设置为一个 consumerGroup,要保证全局有 序,就需要保证每个环节都有序
只有一个可能,就是一个生产者,一个 partition,一个消费者。这种场景和大数据应用场景相悖

5. 为什么离线分析要用 kafka?

Kafka的作用时解耦,如果直接从日志服务器上采集的话,实时离线都要采集,等于要采集两份数据,而使用了Kafka的话,只需要在日志服务器上采集一份数据,然后在Kafka中使用不同的两个组读取就行了

6. Kafka怎么进行监控

Kafka Manager

7. Kafka额ISR副本同步队列

ISR(In-Sync Replicas),副本同步队列。ISR 中包括 Leader 和 Follower。如果 Leader 进程挂掉,会在 ISR 队列中选择一个服务作为新的 Leader。有 replica.lag.max.messages(延迟条数)和 replica.lag.time.max.ms(延迟时间)两个参数决定一台服务是否可以加入 ISR 副本队列, 在 0.10 版本移除了 replica.lag.max.messages 参数,防止服务频繁的进去队列。 任意一个维度超过阈值都会把 Follower 剔除出 ISR,存入 OSR(Outof-Sync Replicas)列表, 新加入的 Follower 也会先存放在 OSR 中

8. Kafka消息数据积压,Kafka消费能力不足怎么处理

  • 如果是Kafka消费能力不足,可以考虑增加Topic的分区数,并且同时提升消费族的消费者数量,消费者数=分区数(两者缺一不可)
  • 如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压

9. Kafka中的ISR、AR代表什么

ISR:in-sync replicas set(ISR),与Leader保持同步的Follower集合

AR:分区的所有副本

10. Kafka中的HW、LEO分别代表什么

LEO:每个副本的最后一条消息的偏移量

HW:一个分区中所有副本中最小的偏移量

11. 哪些情景会造成消息漏消费

先提交Offset,后消费,有可能造成消息漏消费

12. 当使用kafka-topic.sh创建一个Topic后,Kafka背后会执行什么逻辑

  1. 会在Zookeeper中的/broker/topic节点创建一个新的topic节点,如:/brokers/topics/first
  2. 触发Controller的监听程序
  3. Kafka Controller负责Topic的创建工作,并更新Metadata Cache

13. Topic的分区数可不可以增加?如果可以,怎么增加

可以增加

bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-config --partitions 3

14. Topic的分区数可不可以减少?如果可以,怎么减少;若不可以,那是为什么

不可以减少,被删除的的分区数据难以处理

15. Kafka有内部Topic吗?如果有是什么?有什么所用?

__consumer_offsets

作用:保存消费者 offset

16. Kafka Controller的作用

负责管理集群Broker的上下线,所有Topic的分区副本分配和Leader选举等工作

17. 失效副本是指什么?有哪些应对措施

不能及时与Leader同步,暂时踢出ISR,等其追上Leader之后再重新加入

18. Kafka有哪些特点?

高吞吐、低延迟:Kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒

可扩展性:Kafka集群支持热扩展

持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

容错性:允许集群中节点失败(若副本数量为n,允许n-1个节点失败)

高并发:支持数千个客户端同时读写

19. 请简述你在哪些场景下会选择Kafka?

  • 日志手机:一个公司可以用Kafka可以收集各种服务的Log,通过Kafka以统一接口服务的方式开放给各种Consumer,例如Hadoop、HBase、Solr等

  • 消息系统:解耦和生产者和消费者、缓存消息等

  • 用户活动跟踪:Kafka经常被用来记录Web用户或者App用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到Kafka的Topic中,然后订阅者通过订阅这些Topic来做实时的监控分析,或者装载到Hadoop、数据仓库中做离线分析和挖掘

  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的几种反馈,比如报警和报告

  • 流式处理:比如Spark Streaming和Flink

20. Kafka的设计架构

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3nCONzJi-1688880579947)(img/Kafka详细架构.png)]

21. Kafka分区的目的

分区对于kafka集群的好处:负载均衡。分区对于消费者来说,可以提高并发度

22. Kafka如何做到消息的有序性

Kafka中的每个Partition中的消息在写入时都是有序的,而且消息带有Offset偏移量,消费者按偏移量的顺序从前往后消费,从而保证了消息的顺序性。但是分区之间的消息是不保证有序的

23. LEO、HW、LSO、LW等分别代表什么

**LEO:**是 LogEndOffset 的简称,代表当前日志文件中下一条

**HW:**水位或水印(watermark)一词,也可称为高水位(high watermark),通常被用在流式处理领域(比如 Apache Flink、Apache Spark 等),以表征元素或事件在基于时间层面上的进 度。在 Kafka 中,水位的概念反而与时间无关,而是与位置信息相关。严格来说,它表示的 就是位置信息,即位移(offset)。取 partition 对应的 ISR 中 最小的 LEO 作为 HW,consumer 最多只能消费到 HW 所在的位置上一条信息。
**LSO:**是 LastStableOffset 的简称,对未完成的事务而言,LSO 的值等于事务中第一条消息 的位置(firstUnstableOffset),对已完成的事务而言,它的值同 HW 相同
**LW:**Low Watermark 低水位, 代表 AR 集合中最小的 logStartOffset 值

24. Kafka在什么情况下会出现消息丢失

  1. Producer把消息发送给Broker,因为网络抖动,消息未到达Broker

    Producer设置asks参数,消息同步到Master后返回ask信号,否则抛出异常使应用程序感知到并在业务中进行重试发送

  2. Producer把消息发送给Broker-Master,Master接收到消息,在未将消息同步给Follower之前,挂掉了

    producer设置acks参数,消息同步到master且同步到所有follower之后返回ack信号,否则抛异常使应用程序感知到并在业务中进行重试发送

  3. producer把消息发送给broker-master,master接收到消息,master未成功将消息同步给每个follower

    解决方案同问题2

  4. 某个broker消息尚未从内存缓冲区持久化到磁盘,就挂掉了,这种情况无法通过ack机制感知

    设置参数,加快消息持久化频率。影响性能

  5. Consumer成功拉取到了消息,Consumer挂了

    设置手动Sync,消费成功才提交

25. 数据传输的事务有几种

数据传输的事务定义通常有以下三种级别:

最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输

最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.

精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都被传输

26. Kafka消费者是否可以消费指定分区消息

KafkaConsumer消费消息时,向Broker发出Fetch请求去消费特定分区的消息,Consumer指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息,Customer拥有了Offset的控制权,可以向后回滚去重新消费之前的消息

27. Kafka消息是采用Pull模式,还是Push模式

Producer将消息推送到Broker,Consumer从Broker拉取消息

28. Kafka高效文件存储设计特点

Kafka把Topic中一个Partition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用

通过索引信息可以快速定位Message和确定Response的最大大小

通过index元数据全部映射到Memory,可以避免SegmentFile的IO磁盘操作

通过索引文件稀疏存储,可以大幅降低Index文件元数据占用空间大小

29. Kafka创建Topic时如何将分区放置到不同的Broker中

副本因子不能大于Broker的个数

第一个分区(编号为0)的第一个副本放置位置是随机从BrokerList选择的

其他分区的第一个副本放置位置相对于第0个分区依次往后移。也就是如果我们有5个Broker,5个分区,假设第一个分区放在第四个Broker上,那么第二个分区将会放在第五个Broker上;第三个分区将会放在第一个Broker上;第四个分区将会放在第二个Broker上,依次类推

剩余的副本相对于第一个副本放置位置其实是由nextReplicaShift决定的,而这个数也是随机产生的

30. Kafka新建的分区会在哪个目录下创建

在启动 Kafka 集群之前,我们需要配置好 log.dirs 参数,其值是 Kafka 数据的存放目录,这个参数可以配置多个目录,目录之间使用逗号分隔,通常这些目录是分布在不同的磁盘上用于提高读写性能。当然我们也可以配置 log.dir 参数,含义一样。只需要设置其中一个即可

如果 log.dirs 参数只配置了一个目录,那么分配到各个 Broker 上的分区肯定只能在这个目 录下创建文件夹用于存放数据

但是如果 log.dirs 参数配置了多个目录,那么 Kafka 会在哪个文件夹中创建分区目录呢?

答案是:Kafka 会在含有分区目录最少的文件夹中创建新的分区目录,分区目录名为 Topic 名+分区 ID。注意,是分区文件夹总数最少的目录,而不是磁盘使用量最少的目录!也就是 说,如果你给 log.dirs 参数新增了一个新的磁盘,新的分区目录肯定是先在这个新的磁盘上 创建直到这个新的磁盘目录拥有的分区目录不是最少为止

31. Kafka的再均衡

在Kafka中,当有新消费者加入或者订阅的Topic数发生变化时,会触发Rebalance(再均衡:在同一个消费者组中,分区的所有权从一个消费者转移到另一个消费者)机制

Rebalance的过程如下:

  1. 所有成员都向corrdinator发送请求,请求入组。一旦所有成员都发送了请求,corrdinator会从中选择一个Consumer担任Leader的角色,并把组成员信息以及订阅信息发给Leader
  2. Leader开始分配消费方案,指明具体哪个Consumer负责消费哪些Topic的哪些Partition。一旦完成分配,Leader会将这个方案发给corrdinator。corrdinator接收到分配方案之后会把方案发给各个Consumer,这样组内的所有成员就都知道自己应该消费哪些分区了

32. Kafka时如何实现高吞吐率的

Kafka 是分布式消息系统,需要处理海量的消息,Kafka 的设计是把所有的消息都写入速度低容量大的硬盘,以此来换取更强的存储能力,但实际上,使用硬盘并没有带来过多的性能损失。kafka 主要使用了以下几个方式实现了超高的吞吐率:

  • 顺序读写
  • 零拷贝
  • 文件分段
  • 批量发送
  • 数据压缩

33. Kafka监控都有哪些

比较流行的监控工具有:
KafkaOffsetMonitor

KafkaManager

Kafka Web Console

Kafka Eagle

JMX 协议(可以用诸如 jdk 自带的 jconsole 来进行连接获取状态信息)

34. Kafka缺点

数据是批量发送,并非真正实时

对于mqtt协议不支持

不支持物联网传感数据直接接入

仅支持统一分区内消息有序,无法实现全局消息有序

监控不完善,需要安装插件

依赖ZK进行元数据管理

35. Kafka消息的存储机制

Kafka通过Topic来分主题存放数据,主题内有分区,分区可以有多个副本,分区的内部还细分为若干个segment。都是持久化到磁盘,采用零拷贝技术

  1. 高效检索

    分区下面,会进行分段操作,每个分段都会有对应的索引,这样就可以根据offset二分查找定位到消息在哪一段,根据段的索引文件,定位具体的message

  2. 分区副本可用性

    Leader选举,ZK来协调,若Leader宕机,选出了新的Leader并不能保证已经完全同步了之前Leader的所有数据,只能保证HW之前的数据是同步过的,此时所有的Follower都要将数据截断到HW的位置,在和新的Leader同步数据,来保证数据一致

    当宕机的Leader恢复,发现新的Leader中的数据和自己持有的数据不一致,此时宕机的Leadre会将自己的数据截断到宕机前的HW位置,然后同步新Leader的数据。宕机的Leader活过来也像Follower一样同步数据,来保证数据的一致性

Round 4:Flink

1、简单介绍一下Flink

Flink是一个面向流处理和批处理的分布式数据计算引擎,能够基于同一个Flink运行,可以提供流处理和批处理两种类型的功能。 在 Flink 的世界观中,一切都是由流组成的,离线数据是有界的流;实时数据是一个没有界限的流:这就是所谓的有界流和无界流。

2、Flink的运行必须依赖Hadoop组件吗

Flink可以完全独立于Hadoop,在不依赖Hadoop组件下运行。但是做为大数据的基础设施,Hadoop体系是任何大数据框架都绕不过去的。Flink可以集成众多Hadooop 组件,例如Yarn、Hbase、HDFS等等。例如,Flink可以和Yarn集成做资源调度,也可以读写HDFS,或者利用HDFS做检查点。

3、Flink集群运行时角色
Flink 运行时由两种类型的进程组成:一个 JobManager 和一个或者多个 TaskManager。

Client 不是运行时和程序执行的一部分,而是用于准备数据流并将其发送给 JobManager。之后,客户端可以断开连接(分离模式),或保持连接来接收进程报告(附加模式)。客户端可以作为触发执行 Java/Scala 程序的一部分运行,也可以在命令行进程 ./bin/flink run … 中运行。
可以通过多种方式启动 JobManager 和 TaskManager:直接在机器上作为 standalone 集群启动、在容器中启动、或者通过YARN等资源框架管理并启动。TaskManager 连接到 JobManagers,宣布自己可用,并被分配工作。
JobManager:
JobManager 具有许多与协调 Flink 应用程序的分布式执行有关的职责:它决定何时调度下一个 task(或一组 task)、对完成的 task 或执行失败做出反应、协调 checkpoint、并且协调从失败中恢复等等。这个进程由三个不同的组件组成:
• ResourceManager
ResourceManager 负责 Flink 集群中的资源提供、回收、分配,管理 task slots。
• Dispatcher
Dispatcher 提供了一个 REST 接口,用来提交 Flink 应用程序执行,并为每个提交的作业启动一个新的 JobMaster。它还运行 Flink WebUI 用来提供作业执行信息。
• JobMaster
JobMaster 负责管理单个JobGraph的执行。Flink 集群中可以同时运行多个作业,每个作业都有自己的 JobMaster。
TaskManagers:
TaskManager(也称为 worker)执行作业流的 task,并且缓存和交换数据流。
必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。请注意一个 task slot 中可以执行多个算子。

4、Flink相比Spark Streaming有什么区别

1)架构模型

Spark Streaming 在运行时的主要角色包括:Master、Worker、Driver、Executor,Flink 在运行时主要包含:Jobmanager、Taskmanager 和 Slot。

2)任务调度

Spark Streaming 连续不断的生成微小的数据批次,构建有向无环图 DAG,Spark Streaming 会依次创建 DStreamGraph、JobGenerator、JobScheduler。
Flink 根据用户提交的代码生成 StreamGraph,经过优化生成 JobGraph,然后提交给 JobManager 进行处理,JobManager 会根据 JobGraph 生成 ExecutionGraph,ExecutionGraph 是 Flink 调度最核心的数据结构,JobManager 根据 ExecutionGraph 对 Job 进行调度。

3)时间机制

Spark Streaming 支持的时间机制有限,只支持处理时间。Flink 支持了流处理程序在时间上的三个定义:处理时间、事件时间、注入时间。同时也支持 watermark 机制来处理滞后数据。

4)容错机制

对于 Spark Streaming 任务,我们可以设置 checkpoint,然后假如发生故障并重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰一次处理语义。
Flink 则使用两阶段提交协议来解决这个问题。

5、Flink 的常用算子?

Flink 最常用的常用算子包括:Map:DataStream → DataStream,输入一个参数产生一个参数,map的功能是对输入的参数进行转换操作。Filter:过滤掉指定条件的数据。KeyBy:按照指定的key进行分组。Reduce:用来进行结果汇总合并。Window:窗口函数,根据某些特性将每个key的数据进行分组(例如:在5s内到达的数据)

6、你知道的Flink分区策略?

什么要搞懂什么是分区策略。分区策略是用来决定数据如何发送至下游。目前 Flink 支持了8中分区策略的实现。

GlobalPartitioner 数据会被分发到下游算子的第一个实例中进行处理。

ShufflePartitioner 数据会被随机分发到下游算子的每一个实例中进行处理。

RebalancePartitioner 数据会被循环发送到下游的每一个实例中进行处理。

RescalePartitioner 这种分区器会根据上下游算子的并行度,循环的方式输出到下游算子的每个实例。这里有点难以理解,假设上游并行度为2,编号为A和B。下游并行度为4,编号为1,2,3,4。那么A则把数据循环发送给1和2,B则把数据循环发送给3和4。假设上游并行度为4,编号为A,B,C,D。下游并行度为2,编号为1,2。那么A和B则把数据发送给1,C和D则把数据发送给2。

BroadcastPartitioner 广播分区会将上游数据输出到下游算子的每个实例中。适合于大数据集和小数据集做Jion的场景。

ForwardPartitioner ForwardPartitioner 用于将记录输出到下游本地的算子实例。它要求上下游算子并行度一样。简单的说,ForwardPartitioner用来做数据的控制台打印。

KeyGroupStreamPartitioner Hash分区器。会将数据按 Key 的 Hash 值输出到下游算子实例中。

CustomPartitionerWrapper 用户自定义分区器。需要用户自己实现Partitioner接口,来定义自己的分区逻辑。例如:

static classCustomPartitionerimplementsPartitioner<String> {
    @Override
    publicintpartition(String key, int numPartitions) {
        switch (key){
            case "1":
                return 1;
            case "2":
                return 2;
            case "3":
                return 3;
            default:
                return 4;
        }
    }
}

7、Flink的并行度了解吗?Flink的并行度设置是怎样的?

Flink中的任务被分为多个并行任务来执行,其中每个并行的实例处理一部分数据。这些并行实例的数量被称为并行度。

我们在实际生产环境中可以从四个不同层面设置并行度:

操作算子层面(Operator Level)

执行环境层面(Execution Environment Level)

客户端层面(Client Level)

系统层面(System Level)

需要注意的优先级:算子层面>环境层面>客户端层面>系统层面。

8、Flink的Slot和parallelism有什么区别?

官网上十分经典的图:

slot是指taskmanager的并发执行能力,假设我们将 taskmanager.numberOfTaskSlots 配置为3 那么每一个 taskmanager 中分配3个 TaskSlot, 3个 taskmanager 一共有9个TaskSlot。

parallelism是指taskmanager实际使用的并发能力。假设我们把 parallelism.default 设置为1,那么9个 TaskSlot 只能用1个,有8个空闲。

9、Flink有没有重启策略?说说有哪几种?

Flink 实现了多种重启策略。

固定延迟重启策略(Fixed Delay Restart Strategy)

故障率重启策略(Failure Rate Restart Strategy)

没有重启策略(No Restart Strategy)

Fallback重启策略(Fallback Restart Strategy)

10、用过Flink中的分布式缓存吗?如何使用?

Flink实现的分布式缓存和Hadoop有异曲同工之妙。目的是在本地读取文件,并把他放在 taskmanager 节点中,防止task重复拉取。

val env = ExecutionEnvironment.getExecutionEnvironment
 
// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
 
// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
 
// define your program and execute
...
val input: DataSet[String] = ...
val result: DataSet[Integer] = input.map(new MyMapper())
...
env.execute()

11、说说Flink中的广播变量,使用时需要注意什么?

我们知道Flink是并行的,计算过程可能不在一个 Slot 中进行,那么有一种情况即:当我们需要访问同一份数据。那么Flink中的广播变量就是为了解决这种情况。

我们可以把广播变量理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。

12、说说Flink中的窗口?

来一张官网经典的图:

Flink 支持两种划分窗口的方式,按照time和count。如果根据时间划分窗口,那么它就是一个time-window 如果根据数据划分窗口,那么它就是一个count-window。

flink支持窗口的两个重要属性(size和interval)

如果size=interval,那么就会形成tumbling-window(无重叠数据) 如果size>interval,那么就会形成sliding-window(有重叠数据) 如果size< interval, 那么这种窗口将会丢失数据。比如每5秒钟,统计过去3秒的通过路口汽车的数据,将会漏掉2秒钟的数据。

通过组合可以得出四种基本窗口:

time-tumbling-window 无重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5))

time-sliding-window 有重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5), Time.seconds(3))

count-tumbling-window无重叠数据的数量窗口,设置方式举例:countWindow(5)

count-sliding-window 有重叠数据的数量窗口,设置方式举例:countWindow(5,3)

13、说说Flink中的状态存储?

Flink在做计算的过程中经常需要存储中间状态,来避免数据丢失和状态恢复。选择的状态存储策略不同,会影响状态持久化如何和 checkpoint 交互。

Flink提供了三种状态存储方式:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。

14、Flink 中的时间有哪几类

Flink 中的时间和其他流式计算系统的时间一样分为三类:事件时间,摄入时间,处理时间三种。

如果以 EventTime 为基准来定义时间窗口将形成EventTimeWindow,要求消息本身就应该携带EventTime。如果以 IngesingtTime 为基准来定义时间窗口将形成 IngestingTimeWindow,以 source 的systemTime为准。如果以 ProcessingTime 基准来定义时间窗口将形成 ProcessingTimeWindow,以 operator 的systemTime 为准。

15、Flink 中水印是什么概念,起到什么作用?

Watermark 是 Apache Flink 为了处理 EventTime 窗口计算提出的一种机制, 本质上是一种时间戳。 一般来讲Watermark经常和Window一起被用来处理乱序事件。

16、Flink 是如何保证Exactly-once语义的?

Flink通过实现两阶段提交和状态保存来实现端到端的一致性语义。 分为以下几个步骤:

开始事务(beginTransaction)创建一个临时文件夹,来写把数据写入到这个文件夹里面

预提交(preCommit)将内存中缓存的数据写入文件并关闭

正式提交(commit)将之前写完的临时文件放入目标目录下。这代表着最终的数据会有一些延迟

丢弃(abort)丢弃临时文件

若失败发生在预提交成功后,正式提交前。可以根据状态来提交预提交的数据,也可删除预提交的数据。

17、Flink中的Window出现了数据倾斜,你有什么解决办法?

window产生数据倾斜指的是数据在不同的窗口内堆积的数据量相差过多。本质上产生这种情况的原因是数据源头发送的数据量速度不同导致的。出现这种情况一般通过两种方式来解决:

  • 在数据进入窗口前做预聚合
  • 重新设计窗口聚合的key

18、Flink是如何处理反压的?

Flink 内部是基于 producer-consumer 模型来进行消息传递的,Flink的反压设计也是基于这个模型。Flink 使用了高效有界的分布式阻塞队列,就像 Java 通用的阻塞队列(BlockingQueue)一样。下游消费者消费变慢,上游就会受到阻塞。

预告篇:

第三篇:MPP数据库

第四篇:大数据BI工具

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

2023年大数据面试通关文牒系列篇 的相关文章

随机推荐

  • 基于matlab的矩阵奇异值(SVD)分解

    目录 1 计算原理 1 1求解V 1 2求解D 1 3求解U 2 MATLAB程序 2 1 注意 1 计算原理 设矩阵A的大小m n m gt n A UD 1 1求解V 首先求出的特征值及特征值 对应的正交单位特征向量 将的特征值从大到小
  • 小米在建IoT护城河Vela NuttX

    MIDC 2020小米开发者大会刚刚过去 整场大会下来 几个印象比较深刻的点是 雷军宣布扩招5000名工程师 最新伸缩式大光圈镜头技术 小爱同学5 0发布 当然了 还有一个更加值得被提及的重点是 首次亮相的Xiaomi Vela物联网软件平
  • 本地系统盘放到服务器上,如何将本地盘映射在云服务器上

    如何将本地盘映射在云服务器上 内容精选 换一换 华为云帮助中心 为用户提供产品简介 价格说明 购买指南 用户指南 API参考 最佳实践 常见问题 视频帮助等技术文档 帮助您快速上手使用华为云服务 云服务器组是对云服务器的一种逻辑划分 云服务
  • 人工智能AI 全栈体系(三)

    第一章 神经网络是如何实现的 一个神经网络用不同的数据做训练 就可以识别不同的东西 那么神经网络究竟是怎么训练的 三 神经网络是如何训练的 1 小朋友如何认识小动物 小时候 每当看到一个小动物时 妈妈就会告诉我这是什么动物 见得多了 慢慢地
  • Docker 安装mysql8

    基础命令 docker stop start restart 停止 启动 重启 查看当前运行镜像 docker ps 查看所有镜像 docker ps a 删除镜像 docker rmi f强制删除 image docker安装MySQL
  • [面向对象]

    学习大纲 什么是面向对象 JS中的内置类 专业叫法 构造函数 自定义类 普通函数执行和构造函数执行的区别 原型 原型链查找机制 Function Object 之间的爱恨情仇 面试题 Number方法重构 Object prototype
  • Linux中_使用stat_命令_显示文件或文件系统的详细信息状态

    文章目录 一 stat命令 一 stat命令 stat指令 显示文件 文件系统的状态 详细信息显示 stat命令主要用于显示文件或文件系统的详细信息 该命令的语法格式如下 用法 stat 选项 文件 参数 必选参数对长短选项同时适用 L d
  • 用好React,你必须要知道的事情

    容器性组件 container component 和展示性组件 presentational component 使用React编写组件时 我们需要有意识地将组件划分为容器性组件 container component 和展示性组件 pr
  • Mqtt通信协议详解

    文章目录 1 简介mqtt 2 mqtt协议实现 3 Mqtt数据包 4 QoS等级 5 mqtt传输安全保证 5 1 应用层 5 2 传输层 5 2 1 TLS安全协议 5 3 网络层 1 简介mqtt MQTT Message Queu
  • Unity3D官方教程:WebGL

    什么是 Unity WebGL WebGL工程选项允许Unity以JavaScript程序形式发布使用HTMl5技术和WebGL渲染API 在网页浏览器中运行的Unity内容 为了构建和测试WebGL内容 在 Build Player 窗口
  • 计算机提示vcruntime140.dll丢失是什么意思?vcruntime140.dll丢失的解决方法(详细方法)

    计算机丢失vcruntime140 dll是什么意思 经常看到有小伙伴有在网上问这样的问题 电脑上这个vcruntime140 dll文件丢失的问题经常发生吧 那么就很有必要给大家详细的说说这一方面的问题了 下面我们来看看 第一 vcrun
  • 工具推荐

    关注它 不迷路 本文章中所有内容仅供学习交流 不可用于任何商业用途和非法用途 否则后果自负 如有侵权 请联系作者立即删除 1 在线地址 http 107 151 202 163 8129 2 工具介绍 由于还没想好网站名字 目前仅以ip形式
  • 理解ROS节点和在中.launch文件启动节点

    理解ROS节点 1 图概念概述 Nodes 节点 一个节点即为一个可执行文件 它可以通过ROS与其它节点进行通信 Messages 消息 消息是一种ROS数据类型 用于订阅或发布到一个话题 Topics 话题 节点可以发布消息到话题 也可以
  • 微信小程序云开发入门

    参考链接 1条消息 微信小程序云开发入门详细教程 Yunlord的博客 CSDN博客 微信小程序云开发 1 新建小程序项目 选择使用云开发 进入小程序后点击云开发创建云环境 选择免费版的环境如果提示余额不足欠费进入腾讯云 如果是小程序进入选
  • 【数据库CS751】Union的用法以及含义

    目录 一 Union的含义 二 Union的用法 1 列数一样 这个很重要 2 union与union all 3 union可以排序么 4 别名会影响union么 那么别名会不会影响排序 那么之前说过了各种连接 其实union本质上还是和
  • Backup database plus archivelog delete input;

    Backup database plus archivelog 会备份归档日志Backup database plus archivelog delete input 会备份归档日志并且会在备份结束后删除默认目录下的归档日志 datafil
  • PAT C入门题目-7-17 成绩转换 (15 分)(switch-case)

    7 17 成绩转换 15 分 本题要求编写程序将一个百分制成绩转换为五分制成绩 转换规则 大于等于90分为A 小于90且大于等于80为B 小于80且大于等于70为C 小于70且大于等于60为D 小于60为E 输入格式 输入在一行中给出一个整
  • Python 学习4.1字典

    一 字典内包含的内容是一系列键和它们对应的值 字典放在花括号 中 键和值之间用冒号 分隔 键值对之间用 分隔 想储存多少键值对都可以 eg 输入 a zwj 250 zzy 251 250 zwj print a zwj print a 2
  • 神经网络容易受到对抗攻击,网络攻防原理与技术

    1 信息化战争的作战原则是什么 三项基本原则 一是隐蔽 二是快速 三是高效 隐蔽 就是通过隐身 欺骗等手段 确保 先敌发现 先敌攻击 快速 就是依托情报监视与侦察和快速打击系统 确保 发现即摧毁 高效 就是联合 集中使用高能精确弹药 确保战
  • 2023年大数据面试通关文牒系列篇

    大数据面试通关文牒系列篇 第二篇 Hadoop生态链 Round 1 HIVE HIVE 基础篇 1 Hive内部表和外部表的区别 未被external修饰的是内部表 被external修饰的为外部表 区别 内部表数据由Hive自身管理 外