Kafka到HDFS,除了用Kafka API和flume之外,还可以用kettle,最大优点是不用写代码!
版本:Kettle版本:8.2、Hadoop版本:3.1.3
前提: 详情请看鄙人的一百零一、Kettle8.2.0连接Hive3.1.2(踩坑,亲测有效)
http://t.csdn.cn/mWfOC
http://t.csdn.cn/mWfOC
前提一、Hadoop系列配置文件已复制到kettle路径下 路径为:D:\java\kettle\pdi-ce-8.2.0.0-342\data-integration\plugins\pentaho-big-data-plugin\hadoop-configurations\hdp30
![](https://img-blog.csdnimg.cn/4b7932145f6d4d84801ad5305dd46f1a.png)
注意:以上的配置文件要与自己使用的服务器配置文件一致
前提二、在D:\java\kettle\pdi-ce-8.2.0.0-342\data-integration\plugins\pentaho-big-data-plugin\hadoop-configurations\hdp30\lib文件夹下注意替换jar包
![](https://img-blog.csdnimg.cn/8b5de4d948824ed1ab2b876c2c0f51a9.png)
前提三、在D:\java\kettle\pdi-ce-8.2.0.0-342\data-integration\lib增加MySQL驱动包,注意驱动包版本问题
![](https://img-blog.csdnimg.cn/93c2e91e288f4faaa03a5cc9b0cedbb1.png)
这些准备好之后,下面开始用kettle采集Kafka中的数据到hdfs!
第1步,在kettle中创建新的转换任务。
![](https://img-blog.csdnimg.cn/e8f94d5459254a44b5cddf4cb1c5cd92.png)
第2步,在Streaming中拖拽Kafka consumer控件,并且修改Kafka consumer控件配置信息
首先,Setup模块
Transformation
:最好新建一个,用于返回流中的结果
Connection
:选择direct ,右侧输入bootstrap-server IP地址:端口号9092
Topics
:选择kafka消费的topic
Consumer group
:消费者组,随便填
![](https://img-blog.csdnimg.cn/ae770f4d6f4140628354e3fb178ac4a6.png)
其次,Batch模块(根据自己的实际情况修改参数)
Duration: 处理批次间隔
Number-of-records: 处理批次条数
maximum concurrent batches:最大并发批次(8.2版本没有,9.3版本有)
message prefetch limit:消息预取限制,防止读取数据量太大,造成kettle挂掉(8.2版本没有,9.3版本有)
Offset-management: 偏移量管理
![](https://img-blog.csdnimg.cn/4c25123594c44ceeb1f91cf0f2f94aae.png)
最后,Result fields模块
![](https://img-blog.csdnimg.cn/aa3f4fddb70642f4a0e7a6470887e839.png)
其余,Fields和Options两个模块不需要修改。
第3步,拖拽一个应用模块里面的写日志控件,方便查看Kafka中的字段
![](https://img-blog.csdnimg.cn/0c519cde4e1546b78c7d9e5b641787ed.png)
第4步,拖拽一个转换模块里面的字段选择控件,只取Kafka日志中的Message字段
![](https://img-blog.csdnimg.cn/f0a3926b76da4c5d80f490eeda3bc3d6.png)
第5步,是关键的一步!!! 拖拽输入模块里的JSON input插件
首先,在文件模块页面,勾选源定义在一个字段里,并从Message字段获取源。可以修改步骤名称
![](https://img-blog.csdnimg.cn/3d81633eaada4f34ac8481baf4074670.png)
其次,在字段模块。如果Select fields查不到字段的话,别慌。手动输入即可,不过此处为第一级别的字段,二级字段不在此处。不需要很详细。另外,要注意字段的类型
这是第一个大坑。明明Kafka里有数据,JSON input控件却找不到字段,最后发现可以手动输入字段名。
![](https://img-blog.csdnimg.cn/183b62f7ebaa45d3a3a772cefd57626c.png)
第6步,由于还有二级字段需要解析,因此还要再拖拽一个JSON input字段
首先,拖拽输入模块里的JSON input插件
其次,在文件模块,勾选源定义在一个字段里,并从evaluationList字段获取源
![](https://img-blog.csdnimg.cn/c07b283303494f79b1d701f66fc59305.png)
最后,在字段页面,Select fields字段。如果找不到,手动输入即可。注意,字段类型
![](https://img-blog.csdnimg.cn/bf1ad516edce4c7e9d5cddba297a8774.png)
第7步,为了方便查看目前已有的字段,并且修改一些字段的名称,再次拖拽转换模块里面的字段选择控件
首先,在选择和修改页面,获取选择的字段,并且把不用的字段删掉
![](https://img-blog.csdnimg.cn/3483349269a945ac87f1a462ca1b8880.png)
其次,在元数据页面,修改需要修改的字段名称,注意字段类型以及格式的修改,特别是时间戳类型的字段,不能用string
![](https://img-blog.csdnimg.cn/9094ae9a6dff46169ada56e0fc813e56.png)
第8步,最后一个控件。由于我这边是从Kafka读取数据到HDFS,所以我这边是拖拽Big Data里的Hadoop file output控件
首先,拖拽Big Data里的Hadoop file output控件
![](https://img-blog.csdnimg.cn/2606ebcd54074b5bbbafd22db9fa063a.png)
其次,在文件页面。
![](https://img-blog.csdnimg.cn/5361c47f28ed483c8c48a9c6517ef630.png)
1.选择Hadoop Cluster 如果没有,可以新建一个 (前提是开启了Hadoop)
1.1.点击New新建Hadoop Cluster
cluster name:随便填
storage:选择HDFS
HDFS的Hostname、Port,注意与Hadoop配置文件core-site.xml里填的一模一样
Username和Password就填自己的服务器密码,一般用户名是root,密码就是自己设置的root用户密码
![](https://img-blog.csdnimg.cn/93d68254894a4eecbde08775714b2c7b.png)
注意:如果配置文件core-site.xml里hostname是hurys22,那么kettle里的Hostname也必须是hurys22,而不能是192.168.1.22这种,这是第二个大坑。
![](https://img-blog.csdnimg.cn/5affafae3f1d4b688cf4d0d7d9a5160f.png)
1.2 这部分填好即可,剩下的部分不用填。填好后测试一下
这4个好即可,其他不需要
![](https://img-blog.csdnimg.cn/7c96823f456b4698800af35265a690d4.png)
2.在Folder/File,选择hdfs的文件路径。可以先自己创建好,也可以让它自动生成。不过,自己要先创建好目标文件夹,因为她有个文件权限的问题。这是第三个大坑!!!
其实,经踩坑后发现,目标文件evaluation.csv,kettle任务可以自行创建,不过目标文件夹rtp,如果HDFS没有则必须提前手动创建好,然后给这个文件夹赋权即可。
2.1 创建hdfs目标文件夹
[root@hurys22 soft]# hdfs dfs -mkdir -p /rtp
2.2 文件赋权
[root@hurys22 ~]# hdfs dfs -chmod -R 777 /rtp/
![](https://img-blog.csdnimg.cn/2840a98f9c6c4bbc8b92f5eb76756c84.png)
注意:必须要修改文件夹权限,否则kettle会报错文件权限不够
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=Administrator, access=WRITE, inode="/rtp":root:supergroup:drwxr-xr-x
3.如果自己已经创建好目标文件,那么要勾选 启动时不创建文件
4.文件扩展名填csv
5.一定要指定日期时间格式,一般选择年月日或者年月日时分秒。否则。你第二次导到HDFS的数据会覆盖第一次导出的数据,因为文件路径是一样的 这是第四个大坑!!!
6.一个小技巧,可以通过显示文件名查看即将生成的目标文件名,从而可以确认文件名是否正确
![](https://img-blog.csdnimg.cn/5b1d3951dc104c2db61413d6794f2bb7.png)
7.在内容页面。选择分隔符,一般是,或者; 还有编码:UTF-8
![](https://img-blog.csdnimg.cn/8ed23bb3a168480fbc6e8b4932fc21c1.png)
8.在字段页面,点击获取字段和最小宽度,检查所需字段以及字段类型,注意时间戳字段类型的格式
![](https://img-blog.csdnimg.cn/a1116c44a29d400f92c4dddf4a9e09d0.png)
第9步,在Hadoop file output控件修改好,保存kettle任务,点击运行
![](https://img-blog.csdnimg.cn/430fc8241d324bcebd2451a20b66949a.png)
同时,HDFS会在提前建好的目标文件夹下自动生成目标文件,当任务停止时,文件才会显示数据大小。
![](https://img-blog.csdnimg.cn/391194aebb04451287a682f87106251c.png)
不过,可以直接下载查看
第一步,点击Download
![](https://img-blog.csdnimg.cn/aec77b7c8d82423cb13389b47e8918ed.png)
第二步,在下载文件夹下可以看到刚才下载的文件evaluation.csv以及文件大小![](https://img-blog.csdnimg.cn/f04a2aadfa944a03b956d48e1e6edd8f.png)
第三步,用notepad++打开目标文件evaluation.csv查看数据
![](https://img-blog.csdnimg.cn/f4e0e1d4c2dc4f10ac95b20e5370359e.png)
截止这边,用kettle采集Kafka数据到HDFS中就全部结束了,坑也挺多的。
乐于奉献共享,帮我你我他!!!