flume采集log4j日志到kafka

2023-11-06

简单测试项目:

1、新建Java项目结构如下:

测试类FlumeTest代码如下:

package com.demo.flume;

import org.apache.log4j.Logger;

public class FlumeTest {
    
    private static final Logger LOGGER = Logger.getLogger(FlumeTest.class);

    public static void main(String[] args) throws InterruptedException {
        for (int i = 20; i < 100; i++) {
            LOGGER.info("Info [" + i + "]");
            Thread.sleep(1000);
        }
    }
}

监听kafka接收消息Consumer代码如下:

package com.demo.flume;

/**
 * INFO: info
 * User: zhaokai
 * Date: 2017/3/17
 * Version: 1.0
 * History: <p>如果有修改过程,请记录</P>
 */

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class Consumer {

    public static void main(String[] args) {
        System.out.println("begin consumer");
        connectionKafka();
        System.out.println("finish consumer");
    }

    @SuppressWarnings("resource")
    public static void connectionKafka() {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.163:9092");
        props.put("group.id", "testConsumer");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("flumeTest"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("===================offset = %d, key = %s, value = %s", record.offset(), record.key(),
                        record.value());
            }
        }
    }
}

log4j配置文件配置如下:

log4j.rootLogger=INFO,console

# for package com.demo.kafka, log would be sent to kafka appender.
log4j.logger.com.demo.flume=info,flume

log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.1.163
log4j.appender.flume.Port = 4141
log4j.appender.flume.UnsafeMode = true
log4j.appender.flume.layout=org.apache.log4j.PatternLayout
log4j.appender.flume.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p [%c:%L] - %m%n
 
# appender console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n

备注:其中hostname为flume安装的服务器IP,port为端口与下面的flume的监听端口相对应

pom.xml引入如下jar:

<dependencies>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.10</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flume.flume-ng-clients</groupId>
        <artifactId>flume-ng-log4jappender</artifactId>
        <version>1.5.0</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.0</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.10.2.0</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-log4j-appender</artifactId>
        <version>0.10.2.0</version>
    </dependency>
    
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>18.0</version>
    </dependency>
</dependencies>

2、配置flume

flume/conf下:

新建avro.conf 文件内容如下:

当然skin可以用任何方式,这里我用的是kafka,具体的skin方式可以看官网

a1.sources=source1
a1.channels=channel1
a1.sinks=sink1

a1.sources.source1.type=avro
a1.sources.source1.bind=192.168.1.163
a1.sources.source1.port=4141
a1.sources.source1.channels = channel1

a1.channels.channel1.type=memory
a1.channels.channel1.capacity=10000
a1.channels.channel1.transactionCapacity=1000
a1.channels.channel1.keep-alive=30

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.topic = flumeTest
a1.sinks.sink1.brokerList = 192.168.1.163:9092
a1.sinks.sink1.requiredAcks = 0
a1.sinks.sink1.sink.batchSize = 20
a1.sinks.sink1.channel = channel1

如上配置,flume服务器运行在192.163.1.163上,并且监听的端口为4141,在log4j中只需要将日志发送到192.163.1.163的4141端口就能成功的发送到flume上。flume会监听并收集该端口上的数据信息,然后将它转化成kafka event,并发送到kafka集群flumeTest topic下。

3、启动flume并测试

  1. flume启动命令:bin/flume-ng agent --conf conf --conf-file conf/avro.conf --name a1 -Dflume.root.logger=INFO,console
  2. 运行FlumeTest类的main方法打印日志
  3. 允许Consumer的main方法打印kafka接收到的数据
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

flume采集log4j日志到kafka 的相关文章

  • Flume实战

    前言 在一个完整的大数据处理系统中 xff0c 除了hdfs 43 mapreduce 43 hive组成分析系统的核心之外 xff0c 还需要数据采集 结果数据导出 任务调度等不可或缺的辅助系统 xff0c 而这些辅助工具在hadoop生
  • SLF4J: Hbase和Flume的slf4j-log4j12-x.x.xx.jar与hadoop的slf4j-log4j12-x.x.xx.jar冲突

    SLF4J Hbase和Flume的slf4j log4j12 x x xx jar与hadoop的slf4j log4j12 x x xx jar冲突 文章目录 SLF4J Hbase和Flume的slf4j log4j12 x x xx
  • Flume系统搭建和使用的一些经验总结-搭建篇

    对于很多公司来说 日志的收集和集中管理是一个必然要经历的阶段 我们公司在经历了一拖再拖之后 终于不得不开始搭建日志收集系统了 对于日志收集系统 我们的首选就是Flume 为何这么坚决呢 难道没有其他工具能做个这个事情么 当然有 不过 考虑到
  • 大数据技术面试-Flume、kafka

    大数据技术面试 Flume kafka 1 Flume组成有哪些 2 Flume拦截器有哪些知识点 3 Flume采集数据会丢失吗 4 FileChannel如何优化 5 如何控制Kafka丢不丢数据 6 Kafka分区分配策略默认哪两种
  • day01(Flume)

    简介 一 概述 Flume是Apache提供的一套用于进行日志收集 汇聚和传输的框架 2 Flume的版本 Flume ng 和Flume og 不兼容 a Flume1 x Flume ng b Flume0 X Flume og htt
  • Flume 数据流监控——Ganglia的安装与部署

    1 Ganglia的安装 1 安装 dhttpd 服务与 php yasin hadoop102 flume sudo yum y install httpd php 2 安装其他依赖 atguigu hadoop102 flume sud
  • 基于Flume日志收集系统架构和设计(一)

    问题导读 1 Flume NG与Scribe对比 Flume NG的优势在什么地方 2 架构设计考虑需要考虑什么问题 3 Agent死机该如何解决 4 Collector死机是否会有影响 5 Flume NG可靠性 reliability
  • JConsole&VisualVM监控总结

    简介 JConsole 以下写作jconsole VisualVM 以下写作jvisualvm 都是比较好的JVM调优工具 且都为JDK自带 可在命令行直接启动 监控示例 Server端 需要监控的主机 配置 设置临时JAVA参数 expo
  • hadoop学习——flume的简单介绍

    flume介绍 概述 Flume最早是Cloudera提供的日志收集系统 后贡献给Apache 所以目前是Apache下的项目 Flume支持在日志系统中定制各类数据发送方 用于收集数据 Flume是一个高可用的 高可靠的 鲁棒性 robu
  • flume使用(二):采集远程日志数据到MySql数据库

    本文内容可查看目录 本文内容包含单节点 单agent 和多节点 多agent 采集远程日志 说明 一 环境 linux系统 Centos7 Jdk 1 7 Flume 1 7 0 二 安装 linux中jdk mysql的安装不多赘述 fl
  • 调速阀------电磁阀

    目录 调速阀 节流阀简图的理解 安装方式 注意 安装注意事项 电磁阀 1 直动式电磁阀 2 分步直动电磁阀 3 先导式电磁阀 二位二通电磁阀 二位三通电磁阀 二位四通电磁阀 三位三通电磁阀 三位四通电磁阀 管道联系式电磁阀 直接控制式电磁阀
  • Flume 学习

    开始启动flume的学习 todo
  • 如何在flume中同时使用regex_extractor选择器和多路复用拦截器?

    我正在测试 Flume 将数据加载到 hHase 中 并考虑使用 Flume 的选择器和拦截器进行并行数据加载 因为源和接收器之间的速度差距 所以 我想要用 Flume 做的是 使用拦截器 regexp extract 类型创建事件标头 使
  • Cloudera 5.4.2:使用 Flume 和 Twitter 流时 Avro 块大小无效或太大

    当我尝试 Cloudera 5 4 2 时出现了一个小问题 基于这篇文章 Apache Flume 获取 Twitter 数据http www tutorialspoint com apache flume fetching twitter
  • 2023_Spark_实验二十九:Flume配置KafkaSink

    实验目的 掌握Flume采集数据发送到Kafka的方法 实验方法 通过配置Flume的KafkaSink采集数据到Kafka中 实验步骤 一 明确日志采集方式 一般Flume采集日志source有两种方式 1 Exec类型的Source 可
  • Flume - 整个文件可以被视为 Flume 中的一个事件吗?

    我有一个用例 需要将目录中的文件提取到 HDFS 中 作为 POC 我在 Flume 中使用了简单的目录假脱机 其中我指定了源 接收器和通道 并且它工作得很好 缺点是我必须为进入不同文件夹的多种文件类型维护多个目录 以便更好地控制文件大小和
  • Flume的Spool Dir可以在远程机器上吗?

    每当新文件到达特定文件夹时 我就尝试将文件从远程计算机获取到我的 hdfs 我在flume中遇到了spool dir的概念 如果spool dir位于运行flume代理的同一台机器上 那么它工作得很好 有什么方法可以在远程计算机中配置假脱机
  • 运行 fatjar 时无法加载 log4j2

    我正在开发一个使用 log4j2 日志记录的项目 在 intellij 中开发时 一切正常 并且日志记录按预期完成 log4j2 xml 通过在启动时通过 intellij 设置传递给 jvm 的 java 属性进行链接 但是一旦我尝试运行
  • 并行读取 Flume spoolDir

    由于我不允许在产品服务器上设置 Flume 因此我必须下载日志 将它们放入 Flume spoolDir 中 并有一个接收器从通道中使用并写入 Cassandra 一切正常 但是 由于 spoolDir 中有很多日志文件 并且当前设置一次仅
  • 如何有效地将数据从 Kafka 移动到 Impala 表?

    以下是当前流程的步骤 Flafka http blog cloudera com blog 2014 11 flafka apache flume meets apache kafka for event processing 将日志写入

随机推荐

  • 第五章 Maven结合Junit实现单元测试

    maven的重要职责之一就是自动运行单元测试 它通过maven surefire plugin与主流的单元测试框架junit和testng集成 并且能够自动生成丰富的结果报表 maven并不是一个单元测试框架 他只是在构建执行打特定的生命周
  • RobotFramework介绍

    Robot Framework 1 入门介绍 小菠萝测试笔记 博客园 cnblogs com
  • C++——初始化列表

    初始化列表 在构造函数执行时 先执行初始化列表 实现变量的初始化 然后再执行函数内部的语句 构造函数体赋值 在创建对象时 编译器通过调用构造函数 给对象中各个成员变量一个合适的初始值 class Date public Date int y
  • css中nth-child的属性

    参数为整数 nth child 1 它表示要选择父元素中索引为该数值的子元素 此时的索引值从1开始 参数是奇数偶数 nth child odd odd表示选择奇数项的子元素 nth child even even表示选择偶数项的子元素 参数
  • Tkinter 组件详解(一):Label

    Tkinter 组件详解之Label Label 标签 组件用于在屏幕上显示文本或图像 Label 组件仅能显示单一字体的文本 但文本可以跨越多行 另外 还可以为其中的个别字符加上下划线 例如用于表示键盘快捷键 何时使用 Label 组件
  • Linux驱动之input输入子系统

    目录 前言 介绍 input dev结构体 输入子系统的使用流程 实例测试 前言 输入子系统用于实现Linux系统输入设备 鼠标 键盘 触摸屏 游戏杆 驱动的一种框架 Linux内核将其中的固定部分放入内核 驱动开发时只需要实现其中的不固定
  • Web自动化测试从基础到项目实战之一启动不同的浏览器及配置

    在web自动化中目前selenium作为底层的自动化测试是目前运用最广的 但是各个公司都会在这个基础之上进行修改 首先当我们测试环境有了之后我们需要做得就是去配置我们的driver 这里的driver你可以理解为就是我们脚本和浏览器之间的桥
  • inuxCentos7.5安装jdk1.8(勿继续踩坑)

    LinuxCentos7 5安装jdk1 8 场景 错误出现 下面到了安装步骤 场景 首先我是一名后端 其实这种工作并不应该由我来干 先甩一下锅哈哈 由于我们公司没有真正的运维 所以什么事都需要我们后端来亲力亲为 一次偶然的机遇就把我派到了
  • Vue 代码检测(ESLint)

    每个人编码的习惯不一样 或美观或不美观 或者在编码的过程中会有些疏漏未曾发现 为提高代码美观度 提高代码审阅效率 使得多人协作时代码风格统一 规定一套编码规则并在编写的过程中遵守该规则变得很有必要 在一些比较正式的大公司 公司也会有一套自己
  • SSR是什么?Vue中怎么实现?

    一 是什么 Server Side Rendering 称其为SSR 意为服务端渲染 指由服务侧完成页面的 HTML 结构拼接的页面处理技术 发送到浏览器 然后为其绑定状态与事件 成为完全可交互页面的过程 先来看看Web3个阶段的发展史 传
  • SpringBoot--将微服务注册到Eureka Server上

    这节课我们一起来学习一下如何将微服务注册到Eureka Server上 关于如何操作Eureka 我们可以参考spring cloud的官方文档 我们先访问spring cloud的官网主页 如下图所示 目前官网Spring Cloud的最
  • pycharm专业版安装方法

    1 去官网下载安装包 有专业版 有社区版 专业版需要破解 社区版不要破解 2 打开pycharm应用程序安装64 bit 需要等待几秒 3 记住安装包解压位置 打开jetbrains 找到bin文件夹 D Program Files Jet
  • Java数据结构之优先级队列(堆)

    文章目录 一 优先级队列 一 概念 二 优先级队列的模拟实现 一 堆的概念 二 堆的存储结构 三 堆的创建 1 堆的创建和向下调整 2 堆的创建和向上调整 四 堆的插入和删除 1 堆的插入 堆的创建和向上调整 续 2 堆的删除 五 用堆模拟
  • Cadence 背景颜色设置

    目录 概述 一 Allegro PCB Designer 二 OrCAD Capture 三 总结 概述 有位粉丝问我 关于背景颜色设置问题 这里我写一篇文章吧 尽自己微薄之力帮助更多的人 加油 一 Allegro PCB Designer
  • 2.22笔记:linux命令不同颜色命令

    浅蓝色 表示软链接 灰色 表示其他文件 绿色 表示可执行文件 红色 表示压缩文件 蓝色 表示目录 红色闪烁 表示链接的文件有问题了 黄色 表示设备文件 包括block char fifo 管道文件 粉色 网络文件
  • bochs+gdb联调linux-0.11内核

    终于把bochs和gdb连起来了 下面描述下步骤以作记录 1 安装bochs 前面有篇文章介绍了bochs源码编译安装过程 这里安装也非常相似 只是命令稍微有些不同 configure enable gdb stub make make i
  • Java语言连接数据库时间读取错误的问题

    连接时候的数据库相关配置
  • pcie inbound和outbound关系

    Inbound PCI域訪问存储器域 Outbound 存储器域訪问PCI域 RC訪问EP RC存储器域 gt outbound gt RC PCI域 gt EP PCI域 gt inbound gt EP存储器域 EP訪问RC EP存储器
  • Jenkins插件下载失败两种处理办法

    持续集成 自动化部署 弹性伸缩教程 http edu csdn net course detail 6452 大家在使用jenkins安装插件的时候经常遇到一下问题 就是插件由于网络或者墙的原因无法直接下载 出现下面截图的问题 处理办法有两
  • flume采集log4j日志到kafka

    简单测试项目 1 新建Java项目结构如下 测试类FlumeTest代码如下 package com demo flume import org apache log4j Logger public class FlumeTest priv