Spring Cloud集成ELK完成日志收集实战(elasticsearch、logstash、kibana)

2023-11-14

简介

对于日志来说,最常见的需求就是收集、存储、查询、展示,开源社区正好有相对应的开源项目:logstash(收集)、elasticsearch(存储+搜索)、kibana(展示),我们将这三个组合起来的技术称之为ELK,所以说ELK指的是Elasticsearch、Logstash、Kibana技术栈的结合。ELK对外作为一个日志管理系统的开源方案,能够可靠和安全地从任何格式的任何来源获取数据,并实时搜索、分析和可视化。

1 Elasticsearch

elasticsearch是一个高可扩展的、开源的、全文本搜索和分析的引擎。它能够近乎实时地存储,检索和分析大量数据,通常用作底层引擎/技术,为具有复杂搜索特性和需求的应用程序提供动力。

elasticsearch的底层是开源库 Lucene。但是,你没法直接用 Lucene,必须自己写代码去调用它的接口。Elastic 是 Lucene 的封装,提供了 REST API 的操作接口,开箱即用。

1.1节点和集群

elasticsearch 本质上是一个分布式数据库,允许多台服务器协同工作,每台服务器可以运行多个 elasticsearch 实例。单个 elasticsearch 实例称为一个节点(node)。一组节点构成一个集群(cluster)

1.2索引(Index)

elasticsearch 会索引所有字段,经过处理后写入一个反向索引(Inverted Index)。查找数据的时候,直接查找该索引。所以,elasticsearch 数据管理的顶层单位就叫做 Index(索引)。它是单个数据库的同义词。每个 Index (即数据库)的名字必须是小写。

下面的命令可以查看当前节点的所有 Index。

curl -X GET ‘http://localhost:9200/_cat/indices?v’

1.3文档(Document)

索引里面的单条记录称为文档(Document),多个文档就组成了一个索引。文档是用JSON格式表示。同一个索引的文档不要求有相同的结构(scheme),但是最好保持相同,这样有利于提高搜索效率。一个简单的文档示例:
{
“user”: “张三”,
“profession”: “java工程师”,
}

1.4 碎片和副本

索引可能存储大量数据,超出单个节点的硬件限制。例如,一个包含10亿个文档的索引占用了1TB的磁盘空间,它可能不适合于单个节点的磁盘,或者可能太慢,无法单独为单个节点提供搜索请求。为了解决这个问题,Elasticsearch提供了将索引细分为多个碎片的功能。当你创建索引时,可以简单地定义你想要的碎片的数量。每个碎片本身都是一个功能齐全、独立的“索引”,可以驻留在集群中的任何节点上。

总而言之,每个索引可以分成多个碎片。一个索引也可以被复制零次(意思是没有副本)或多次。一但复制,每个索引将具有主碎片(原始碎片)和复制碎片(主碎片的副本)。在创建索引时,每个索引可以定义碎片和副本的数量。默认情况下,Elasticsearch中的每个索引分配5个主碎片和1个副本。

关于elasticsearch的深入了解请参考elastic官方网站:https://www.elastic.co/cn/

2 Logstash

Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。

2.1管道

logstash的事件处理管道通常具有一个或多个的输入插件、过滤器、输出插件。logstash的事件处理通常分为三个阶段:输入→过滤器→输出。

2.2输入

数据往往以各种各样的形式,或分散或集中地存在于很多系统中。Logstash 支持各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。

logstash使用输入插件实现数据导入。常用的输入插件如下:

插件 说明
tcp 从TCP套接字读取事件
http 从http请求中读取事件
file 从文件中读取
beats 从Elastic Beats框架接收事件
kafka 从kafka中读取
rabbitmq 从rabbitmq中读取
redis 从Redis实例读取事件
log4j 从Log4j SocketAppender对象通过TCP读取事件
elasticsearch 从Elasticsearch集群读取
jdbc 从jdbc数据中读取
websocket 从网络套接字读取事件

Filebeat客户端是一个轻量级的、资源友好的工具,它从服务器上的文件中收集日志,并将这些日志转发到你的Logstash实例以进行处理。Filebeat设计就是为了可靠性和低延迟。Filebeat在主机上占用的资源很少,而且Beats input插件将对Logstash实例的资源需求降到最低。

关于更多输入插件参照:
https://www.elastic.co/guide/en/logstash/current/input-plugins.html

2.3 过滤器

数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。

Logstash 能够动态地转换和解析数据,不受格式或复杂度的影响。常用的过滤器如下:

1.grok插件:解析并构造任意文本。Grok是目前Logstash中将非结构化日志数据解析为结构化和可查询内容的最佳方式。有了内置于Logstash的120种模式,很可能会找到满足需求的模式!

2.ruby插件: 官方对ruby插件的介绍是无所不能。ruby插件可以使用任何的ruby语法,无论是逻辑判断,条件语句,循环语句,还是对字符串的操作,对EVENT对象的操作,都是极其得心应手的。

3.mutate插件: mutate插件是用来处理数据的格式的。

4.json插件:这个插件也是极其好用的一个插件,现在我们的日志信息,基本都是由固定的样式组成的,我们可以使用json插件对其进行解析,并且得到每个字段对应的值。

更多过滤器插件请参考:
https://www.elastic.co/guide/en/logstash/current/filter-plugins.html

2.4 输出

输出是Logstash管道的最后阶段。事件可以通过多个输出,但是一旦所有输出处理完成,事件就完成了它的执行。 Logstash有很多输出选择,最常用的是输出到elasticsearch。常见的输出还有:

1.elasticsearch
2.File
3.Emial
4.http
5.Kafka
6.Redis
7.MongoDB
8.Rabbitmq
9.Syslog
10.Tcp
11.Websocket
12.Zabbix
13.Stdout
14.Csv

更多输出插件请参照:
https://www.elastic.co/guide/en/logstash/current/output-plugins.html

3 Kibana

Kibana 是一款开源的数据分析和可视化平台,它是 Elastic Stack 成员之一,设计用于和 Elasticsearch 协作。您可以使用 Kibana 对 Elasticsearch 索引中的数据进行搜索、查看、交互操作。您可以很方便的利用图表、表格及地图对数据进行多元化的分析和呈现。

Kibana 可以使大数据通俗易懂。它很简单,基于浏览器的界面便于您快速创建和分享动态数据仪表板来追踪 Elasticsearch 的实时数据变化。
更多资料请参考:
https://www.elastic.co/guide/cn/kibana/current/index.html

4 ELK实战

此方案大致思路如下:
1.应用将日志传入kafka。
2.logstash在kafka上消费(读取)日志内容,写入elasticsearch。
3.kibana读elasticsearch,做对应的展示。

这样的好处是:
1)几乎不用做特别大的修改,只需做一定的配置工作即可完成日志收集;
2)日志内容输入kafka几乎没有什么瓶颈,可以做到解耦,另外kafka的扩展性能很好,也很简单;
3)收集的日志几乎是实时的;
4)整体的扩展性很好,很容易消除瓶颈,例如elasticsearch分片、扩展都很容易。

方案架构图如下:

在这里插入图片描述

4.1 使用docker-compose安装ELK组件

本示例依赖于:

  • jdk8
  • docker 18.06.1
  • docker-compose 1.24.1
  1. 编写docker-compose.yml 文件

安装两个elasticsearch集群,kibana,logstash

vim docker-compose.yml

文件内容如下:

version: '3'
services:
  elasticsearch:
    image: elasticsearch:7.5.2
    container_name: jdyp-elasticsearch
    environment:
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - node.name=elasticsearch
      - cluster.name=docker-elasticsearch-cluster
      - xpack.security.enabled=false
      - discovery.zen.minimum_master_nodes=1
      - discovery.zen.ping.unicast.hosts=ip:9300, ip:9301
      - http.cors.enabled=true
      - http.cors.allow-origin="*"

    volumes:
      - esdata:/usr/share/elasticsearch/data
    hostname: elasticsearch
    network_mode: host
    restart: always
    ports:
      - 9200:9200
      - 9300:9300
  es-slave:
    image: elasticsearch:7.5.2
    container_name: jdyp-es-slave
    environment:
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - node.name=es-slave
      - cluster.name=docker-elasticsearch-cluster
      - xpack.security.enabled=false
      - discovery.zen.minimum_master_nodes=1
      - discovery.zen.ping.unicast.hosts=ip:9300, ip:9301
      - http.cors.enabled=true
      - http.cors.allow-origin="*" 
    volumes:
      - esdata2:/usr/share/elasticsearch/data
    hostname: es-slave
    network_mode: host
    restart: always
    ports:
      - 9201:9200
      - 9301:9300

  kibana:
    image: kibana:7.5.2
    container_name: jdyp-kibana
    hostname: kibana
    network_mode: host
    environment:
      - ELASTICSEARCH_URL=http://ip:9200
      - ELASTICSEARCH_HOSTS=http://ip:9200
    depends_on:
      - elasticsearch
      - es-slave
    restart: always
    ports:
      - "5601:5601"
  logstash:
    image: logstash:7.5.2
    container_name: jdyp-logstash
    command: logstash -f /etc/logstash/conf.d/logstash.conf
    volumes:
      - ./logstash:/etc/logstash/conf.d
      - /opt/logs/:/opt/build/
    environment:
      - elasticsearch.hosts=http://ip:9200
      # 解决logstash监控连接报错
      - xpack.monitoring.elasticsearch.hosts=http://ip:9200
    hostname: logstash
    network_mode: host
    restart: always
    depends_on:
      - elasticsearch
      - es-slave
    ports:
      - 9600:9600
      - 5044:5044
volumes:
  esdata:
    driver: local
  esdata2:
    driver: local

2.编写logstash配置文件

本方案将日志从kafka输入,存储到elasticsearch。
在docker-compose.yml文件目录下的logstash文件下编写logstash.conf文件。
vim ./logstash/logstash.conf

文件内容如下:

input {
 
    tcp {
    port => 5044
    codec => "json"
  }
 
    kafka {
      bootstrap_servers => "http://ip:9092" #kafka地址
      topics=>["applog"]
    }
}

filter {
    json {
             source => "message"
         }
}

output{
  elasticsearch {
    hosts => ["ip:9200"]     #elastic地址
    action => "index"
    index => "keyway-log-%{+YYYY.MM.dd}"
    }
  stdout {
    codec => rubydebug
    }
}

3.启动

在docker-compose.yml同级目录下使用命令:docker-compose up -d

如果系统中没有镜像会先去拉取镜像,耗时较长需要等待,另外可以配置国内镜像加速器加快镜像的拉取速度,这里选择阿里云的镜像加速器。
阿里云:https://dev.aliyun.com/search.html
注意阿里云需要登录,每个人对应一个地址。登录之后在镜像那搜索docker:

在这里插入图片描述
然后选择镜像加速器:
在这里插入图片描述

获取加速地址后,输入命令:vim /etc/docker/daemon.json

复制获取的地址:

{
  "registry-mirrors": ["https://你的地址"]
}

保存退出,然后重启docker:

sudo systemctl daemon-reload

sudo systemctl restart docker

等待完成后输入 docker ps 查看部署的容器,如果存在则说明部署成功了。然后输入 curl:http://localhost://127.0.0.1:9200,如果出现如下界面则说明elasticsearch部署成功:

在这里插入图片描述

接下来在浏览器输入 ip:5601,如果出现以下界面则说明kibana安装成功:
在这里插入图片描述

另外。由于日志采用kafka输入,因此还需要安装kafka

  1. 部署kafka

编写docker-compose.yml文件

cd /opt/docker/kafka (可以是任意你想要的目录)

vim docker-compose.yml

文件内容如下:

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: jdyp-zoo1
    ports:
      - "2181:2181"
    network_mode: host
  kafka:
    image: wurstmeister/kafka
    container_name: jdyp-kafka
    depends_on:
      - zookeeper
    network_mode: host
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 5
      KAFKA_ZOOKEEPER_CONNECT: 192.168.7.211:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.7.211:9092  #宿主机监听端口
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

在docker-compose.yml同级目录下执行命令:docker-compose up -d

执行完后,可以通过命令 docker ps查看容器是否启动成功,启动成功后可以进入容器测试kafka功能是否正常。

进入容器指令:docker exec -it kafka的容器id /bin/bash

此次,Spring Cloud整合ELK的准备工作就完成了。

4.2 Spring Cloud集成 ELK

Spring Cloud版本:Spring cloud Finchley.SR2

4.2.1 pom文件引入依赖

 <!--引入logback日志输出配置,这时由于kafka绑定的是日志事件-->
<dependency>
    <groupId>net.logstash.logback</groupId>
    <artifactId>logstash-logback-encoder</artifactId>
    <version>5.2</version>
</dependency>
<dependency>
    <groupId>com.github.danielwegener</groupId>
    <artifactId>logback-kafka-appender</artifactId>
    <version>0.1.0</version>
</dependency>

4.2.2 配置文件

添加logback.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true">
    <appender name="KafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
        <encoder class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder">
            <layout class="net.logstash.logback.layout.LogstashLayout" >
                <includeContext>true</includeContext>
                <includeCallerData>true</includeCallerData>
                <customFields>{"system":"service-1"}</customFields>
<!--                <fieldNames class="net.logstash.logback.fieldnames.ShortenedFieldNames"/>-->
            </layout>
            <charset>UTF-8</charset>
        </encoder>
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter"><!-- 只打印info及其以上的日志 -->
            <level>INFO</level>
            <!--<onMatch>ACCEPT</onMatch>-->
            <!--<onMismatch>DENY</onMismatch>-->
        </filter>
        <!--kafka topic -->
        <topic>applog</topic>
        <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.HostNameKeyingStrategy" />
        <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
        <producerConfig>bootstrap.servers=ip:9092</producerConfig>
    </appender>
       <root level="info">
        <appender-ref ref="KafkaAppender"/>
    </root>

</configuration>

注意: bootstrap.servers=ip:9092填写你自己的kafka所在服务器的ip地址

application指定日志文件
logging:
config: classpath:logback.xml

至此,Spring Cloud整合ELK已经弄好了。调用微服务接口,然后就可以在kibana上看到日志信息,如图所示:
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spring Cloud集成ELK完成日志收集实战(elasticsearch、logstash、kibana) 的相关文章

随机推荐

  • flutter iOS 缺少通知权限,缺少位置权限

    App Store Connect 亲爱的开发者 我们发现了一个或多个问题与您的应用程序 hayya附近的朋友Chat Meet 1 0 3 1 最近的交付 您的交付是成功的 但您可能希望在您的下一次交付纠正以下问题 ITMS 90078
  • docker问题笔记--前端容器更新失败

    问题描述 情况是这样的 今天由于项目需要 更新了前端容器 但是重启之后发现还是旧的前端 一时间有点迷惑 我的更新方式是为了图省事 并没有用新的前端镜像去重新起一个容器 而是直接用dist文件夹内容替换前端容器中的frontend文件夹内容
  • 在vue项目中引入jssdk所遇到的各种问题

    由于在最近的项目中 需要用到扫一扫二维码签到的功能 在纯H5的页面中要实现这个是不太可能的 所以考虑用jssdk或者混合开发 由于没有微信公众号和混合开发的经验 混合开发不太现实 公司没有考虑这个 而jssdk有其他公众号平台的公司配合 所
  • 封装HashMap加入URLdecoder解码器,防注入

    其中URLDecoder decode有个好处 就是防止 sql注入 当然对其他字符无效了 当在input输入用户名时候 10001 经过后台先获取 并解码 会报错 package test import java io Unsupport
  • 51单片机矩阵键盘

    目录 前言 一 矩阵键盘扫描 二 LCD1602显示矩阵键盘键值 三 趣味小项目 密码锁 总结 前言 矩阵键盘的原理很浅显易懂 不涉及任何寄存器 就是纯线路连接 一 矩阵键盘扫描 矩阵键盘的内部接线图如下 不难看出其组成就是多个独立按键彼此
  • vue.js使用webpack发布,部署到服务器上之后如何在浏览器中查看到vue文件源码

    webpack vue 2 0打包发布之后 将发布的文件部署到服务器中之后 浏览器中访问的时候会出现一个webpack文件夹 里边会显示vue文件源码 如果想让vue源文件显示出来 可以在config index js 中 build 下的
  • 利用sqlmap进行post注入学习笔记

    了解sqlmap sqlmap是一款开源 功能强大的自动化SQL注入工具 支持Access MySQL Oracle SQL Server DB2等多种数据库 支持get post cookie注入 支持基于布尔的盲注 基于时间的盲注 错误
  • Mac上安装dlib的一堆坑

    Failed to build dlib ERROR Could not build wheels for dlib which is required to install pyproject toml based projects 主要
  • 指针笔试题(一)

    include
  • Tomcat启动报错Port 8080 required by Tomcat v9.0 Server at localhost is already in use. The server may al

    Port 8080 required by Tomcat v9 0 Server at localhost is already in use The server may already be running in another pro
  • VMware创建Linux虚拟机之(三)Hadoop安装与配置及搭建集群

    Hello world 本篇博客使用到的工具有 VMware16 Xftp7 若不熟悉操作命令 推荐使用带GUI页面的CentOS7虚拟机 我将使用带GUI页面的虚拟机演示 虚拟机 Virtual Machine 指通过软件模拟的具有完整硬
  • MyBatis 多对多 中间表插入数据

    在做这个员工管理系统demo的时候 由于user和role是多对多关系 且user主键是自增的 所有我们没办法提前知晓这个user id 所以插入的时候 就需要先插入user 然后再找到刚插入的id拿出来 再插入中间表user role 这
  • [913]MySQL查看数据库表容量大小

    查看一个数据中所有表的相关信息 1 可以在命令下使用show table status G命令查看 2 如果想知道MySQL数据库中每个表占用的空间 表记录的行数的话 可以打开MySQL的 information schema 数据库 在该
  • R语言实现的长转宽

    现在给大家介绍的数据处理技巧是长转宽 也就相当于Excel中的转置 不过用R语言实现的长转宽还有数据合并的功能 自然比Excel强大多了 这里给大家介绍4个函数 其中melt dcast 来自reshape2包 gather spread
  • stm32裸机开发下利用MultiTimer多任务时间片询

    stm32裸机开发下利用MultiTimer多任务时间片询 MultiTimerGithub地址 https github com 0x1abin MultiTimer 这是一个类似Arduino平台上的Ticker库 如需阅读懂源码 起码
  • 【Linux】管道

    管道命令 include
  • 云服务器1:云服务器能干什么

    云服务器1 云服务器能干什么 云服务器能干什么 服务器是啥 就是一个24小时不断电的电脑 有linux系统 windows2003 2013 你可以用他来挂qq 挂软件 挂一切你想挂的 除了游戏 因为他本身是用来为大家提供远程信息处理服务的
  • 微信小程序组件:多图上传

    由于在小程序开发过程中多次用到图片上传功能 在最近一次项目开发时 决定将其打包成组件来提高复用性 首先 在components文件夹下 新建Component 名称为 image uploader image uploader wxml
  • 创新管理 一

    每几天一篇 业界学习知识分享 请关注 如有同感请加vip阅读 创新的动力 创新无处不在 因为都在嘴 dk 创新动力是如果不做命运基本由时间决定 我们需要传递价值 服务 我们听过 微软离倒闭只有两年 只有偏执狂才能存活下来 而道之意 既是永远
  • Spring Cloud集成ELK完成日志收集实战(elasticsearch、logstash、kibana)

    简介 对于日志来说 最常见的需求就是收集 存储 查询 展示 开源社区正好有相对应的开源项目 logstash 收集 elasticsearch 存储 搜索 kibana 展示 我们将这三个组合起来的技术称之为ELK 所以说ELK指的是Ela