公司实战 ElasticSearch+Kafka+Redis+MySQL

2023-11-19

一、需求

前一段时间公司要进行数据转移,将我们ES数据库中的数据转移到客户的服务器上,并且使用定时将新增的数据同步,在这过程中学到了很多,在此记录一下!

二、技术栈

Mysql + Redis + ElasticSearch + Kafka

三、方案

为了降低服务器的压力,在每天的零时进行推送数据,推送前比较上一次推送记录在Redis中的数据,此记录为ES数据库中的时间字段,每次推送结束前都会将最新的时间更新在这个key中,如果获取ES数据库中的字段与key一样,说明今日无数据更新。

因为ES索引的数据量在千万以上,所以没有选择分页,而是选择了ES的滚轮查询。

 public static void getDayData(RestHighLevelClient client,
                                    KafkaTemplate kafkaTemplate,
                                    RedisUtil redisUtil,
                                    String field,
                                    String indexName,
                                    String topic) {
        //发送创建索引所需的相关信息  索引名 属性 分片
        HashMap<String, Object> map1 = new HashMap<>();
        map1.put("indices", indexName);
        map1.put("mappings", ElasticSearchUtil.getIndexMappings(client, indexName));
        map1.put("settings", ElasticSearchUtil.getIndexSettingsShards(client, indexName));
        kafkaTemplate.send(str, JSON.toJSONString(map1));


        int i = 0;
        final Scroll scroll = new Scroll(TimeValue.timeValueSeconds(30L));
        SearchRequest request = new SearchRequest(indexName);
        request.scroll(scroll);
        SearchSourceBuilder builder = new SearchSourceBuilder();
        //查询此索引的所有数据

        builder.query(
                QueryBuilders.rangeQuery(field)
                        .gt(redisUtil.hget(indexName,"push_time"))
                        ).sort(field, SortOrder.ASC);

        builder.size(1000);
        request.source(builder);
        SearchResponse response = null;
        try {
            response = client.search(request, RequestOptions.DEFAULT);
        } catch (Exception e) {
            e.printStackTrace();
        }
        String scrollId = response.getScrollId();
        SearchHit[] hits = response.getHits().getHits();
        // 没有新增数据
        if(hits == null)
            log.info("索引 {} 今日无新增数据",indexName);

        for (SearchHit hit : hits) {
            Map<String, Object> map = hit.getSourceAsMap();
            map.put("_id", hit.getId());
            kafkaTemplate.send(topic, JSON.toJSONString(map));
            i++;
        }
        //完成第一次后 更新key
        redisUtil.hset(indexName, "push_time", hits[hits.length - 1].getSourceAsMap().get(field));
        //通过在循环中调用搜索滚动 API 来检索所有搜索命中 直到不返回任何文件
        while (hits != null && hits.length > 0) {
            // 处理返回的搜索结果
            SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
            scrollRequest.scroll(scroll);
            try {
                response = client.scroll(scrollRequest, RequestOptions.DEFAULT);
            } catch (Exception e) {
                e.printStackTrace();
            }
            scrollId = response.getScrollId();
            hits = response.getHits().getHits();
            for (SearchHit hit : hits) {
                Map<String, Object> map = hit.getSourceAsMap();
                map.put("_id", hit.getId());
                kafkaTemplate.send(topic, JSON.toJSONString(map));
                i++;
                System.out.println(i);
            }
            //从第二次开始 每次都要更新key
            redisUtil.hset(indexName, "push_time", hits[hits.length - 1].getSourceAsMap().get(field));
        }
        log.info("索引 {} 总共推送了 {} 条", indexName, i);
        // 滚动完成后清除滚动上下文
        ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
        clearScrollRequest.addScrollId(scrollId);
        try {
            client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

TimeValue.timeValueSeconds(30L)
builder.size(1000)

这个参数最开始设置的是5L,但是条件查询的大小设置为了1000,可能会出现到了预计的时间但是没有找到1000条数据从而产生报错,所以尽可能将滚轮滚动的时间设置大一些,反正搜索完就会进行下一次滚动,不会产生拉低效率的问题!


在正式发送数据之前要提前将要发送的索引的信息(字段属性、分片信息)发送至Kafka的消费端,这样做的目的是如果客户服务器没有该索引就手动创建索引,一般情况来说,我们是不允许在消费端自动创建索引的,会造成字段属性出错。

获取索引属性信息和分片的工具类

    /**
     * 获取 索引 mappings
     * @param client
     * @param index
     * @return
     */
    public static Map<String, Object> getIndexMappings(RestHighLevelClient client, String index) {
        GetMappingsRequest request = new GetMappingsRequest();
        request.indices(index);
        GetMappingsResponse resp = null;
        try {
            resp = client.indices().getMapping(request, RequestOptions.DEFAULT);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return resp.mappings().get(index).getSourceAsMap();
    }


    /**
     * 获取 索引 settings的分片
     *
     * @param client
     * @param index
     * @return
     */
    public static Map<String, String> getIndexSettingsShards(RestHighLevelClient client, String index) {
        Map<String, String> resMap = new HashMap<>();
        GetSettingsRequest request = new GetSettingsRequest();
        request.indices(index);
        try {
            GetSettingsResponse resp = client.indices().getSettings(request, RequestOptions.DEFAULT);
            Settings settings = resp.getIndexToSettings().get(index);
            System.out.println(settings);
            resMap.put("number_of_shards", settings.get("index.number_of_shards"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return resMap;
    }

为了更好的区分和控制,发送创建索引消息的Topic与发送数据的Topic不是同一个,这时候存在一个问题,就是创建索引完成的时间无法控制,从而无法发送数据
我们使用到了JUC的辅助类CountDownLatch,作为一个减数器,如果索引创建完毕,减数器减一,释放锁,非常好用!

@KafkaListener(id = "IPAttack", topics = "IPAttack")
    public void IPAttackContinuous(List<String> records) throws InterruptedException {
        BulkProcessor bulkProcessor = GetBulkProcessor.getBulkProcessor(client);

        // 等待 index 创建
        if (!ElasticSearchUtil.isExistsIndex(client, index)) {
            log.error("索引: {} 还未创建", index);
            //加锁 减数器的值设置为1
            cdl = new CountDownLatch(1);
            //减数器归0才能执行下面的代码
            cdl.await();
        }
        //批量入库
        for (String record : records) {
            Map map = JSONObject.parseObject(record, Map.class);
            String _id = map.get("_id").toString();
            map.remove("_id");

            bulkProcessor.add(new IndexRequest(index).id(_id).source(map));
        }
        bulkProcessor.flush();
        bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
    }

我们发现如果没有创建好索引,线程会阻塞导致无法执行下面的代码!

  @KafkaListener(id = "CreateIndex",topics = "CreateIndex")
    public void createIndexListener(String record){
        Map map = JSON.parseObject(record,Map.class);
        String index = map.get("indices").toString();
        if(!ElasticSearchUtil.isExistsIndex(client,index)){
            log.info("索引: {} 开始创建", index);
            CreateIndexRequest indices = new CreateIndexRequest(index);
            indices.settings((Map<String, ?>) map.get("settings"));
            indices.mapping((Map<String, ?>)map.get("mappings"));
            try{
                client.indices().create(indices, RequestOptions.DEFAULT);
            }catch (Exception e){
                e.printStackTrace();
            }
            //创建索引完毕释放锁
            if(DnsReceive.cdl != null){
                DnsReceive.cdl.countDown();
            }
            log.info("索引: {} 创建完成", index);
        }else{
            log.info("已经存在索引:{}",index);
        }
    }

countDown()方法执行后,说明索引创建完毕,此时减数器减一,发送数据的Topic接收到就会开始批量数据入库
数据推送完毕后,可以将此次推送的数据量、索引名等等信息记录在MySQL中,这边还没有要求所以没有写



四、总结

整体下来锻炼了逻辑思维和写代码的能力,完成以后又想了一遍觉得其实没有那么难,但对于小白刚入职场的我来说,是一次历练,无论对于我想问题的方式还是排错的切入点都有很好的帮助!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

公司实战 ElasticSearch+Kafka+Redis+MySQL 的相关文章

  • 如何制作行业标准的桌面Java应用程序? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • Java中使用正则表达式确定字符串是否为URL [重复]

    这个问题在这里已经有答案了 可能的重复 检查字符串是否为有效 URL 的最佳正则表达式是什么 https stackoverflow com questions 161738 what is the best regular express
  • “此 GPIO 引脚已存在:”第二次出现 GPIO 1 异常

    我正在 Raspberry pi 和 java 上工作 通过使用 pi4j 使 LED 闪烁 一切都已清除并且工作正常 LED 按照代码闪烁 但是当我第二次运行时 它会导致以下错误 我已经搜索了很多有很多相同的问题没有明确的答案如何解决 任
  • MySQL 和 Hibernate 之间的主键自增由谁负责?

    MySQL CREATE TABLE role id role INT 11 unsigned NOT NULL AUTO INCREMENT PRIMARY KEY id role AUTO INCREMENT 1 休眠 Entity p
  • 使用 TLS 证书 JDBC 连接到 Oracle 数据库

    我正在尝试用 Java 编写一个连接类来使用 JDBC 驱动程序连接到 Oracle 数据库 但我想保护用于连接到 Oracle 数据库的参数 例如 jdbcurl 用户名 密码 我必须使用 TLS 证书概念来连接到 Java 中的 Ora
  • Spring 应用程序启动前的 Spring Boot 设置日志记录

    我有一个项目 在启动 SpringApplication 之前需要日志记录机制 我怎样才能做到这一点 我尝试设置自己的日志记录机制 LogManager getLogManager readConfiguration 但在 Spring 应
  • 如何在首次运行时填充大型 SQLite 数据库

    我正在开发一个基于 SQLite 数据库的字典应用程序 该数据库包含超过 300 000 行 问题在于 最终形式的数据库文件由全文索引表组成 并且重量远远超过150Mb 我通过创建无内容的 fts4 表设法将 db 文件大小降至最低 数据库
  • Spring WebFlux:在 Spring Data MongoDB 反应存储库中的 null 值时发出异常?

    我正在尝试学习如何使用 MongoDB 反应存储库spring boot 2 0 0 M2 但我担心我没有按预期做事 这是我的方法之一 试图找到一个User通过他们的电子邮件 但如果没有 该方法应该抛出异常 Override public
  • Java 相当于 C# 的 async/await?

    我是一名普通的 C 开发人员 但偶尔也会使用 Java 开发应用程序 我想知道 Java 中是否有相当于 C async await 的东西 简单来说 java 相当于 async Task
  • Android-如何在指定时间后台下载数据

    我提前很抱歉没有发布任何代码 主要是因为我一生都无法弄清楚我需要如何做我需要做的事情 基本上 在一天中的指定时间间隔 例如下午 5 点 我希望我的应用程序从我的服务器下载一些数据并将其存储在设备上 这是为了减少每次运行应用程序时下载数据对我
  • Java:字符串连接和变量替换的最佳实践[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 在 Java 中连接字符串和添加变量值的方法有太多 我应该如何选择一个 优点 缺点 最佳用例等 MessageFormat forma
  • 在java中的super调用之前创建一个对象

    考虑到简单的java代码是行不通的 public class Bar extends AbstractBar private final Foo foo new Foo bar public Bar super foo 我需要在之前创建一个
  • 从流中过滤/删除无效的 xml 字符

    首先 我无法更改 xml 的输出 它是由第三方生成的 他们在 xml 中插入无效字符 我得到了 xml 字节流表示形式的 InputStream 除了将流消耗到字符串中并对其进行处理之外 是否有一种更干净的方法来过滤掉有问题的字符 我找到了
  • 将文件内容存储到数组中

    我的刽子手程序有问题 我真的认为我需要做的事情超出了我对java的理解 这是我的代码 import java io BufferedReader import java io FileReader import java io FileNo
  • JavaFX:在 WebView img 标签中未加载本地图像

    以下是我的代码 一切安好 我可以加载远程页面 我可以放置 HTML 内容 但我的img标签显示一个X标志表示无法加载图像 Note 我的图像与类位于同一个包中JavaFX在 Smiley 文件夹中 我可以列出所有图像 这意味着路径没有问题
  • 使用 JavaFX 和 Maven 将模块描述符添加到库中[重复]

    这个问题在这里已经有答案了 我需要使用反思 https github com ronmamo reflections在一个带有 JavaFX 的 Maven 项目中 我想使用jlink捆绑一个最小的 JRE 问题是我运行时出现以下错误mvn
  • javaFX,抛出 NullPointerException,位置是必需的

    我看过其他答案 但没有任何帮助我 抱歉 GUI新手只知道swing的基础知识 这是主课 package application import javafx application Application import javafx fxml
  • 使用 Jsoup 选择没有类的 HTML 元素

    考虑一个像这样的 html 文档 div p p p p p class random class name p div 我们怎样才能选择所有p元素 但不包括p元素与random class name class Elements ps b
  • Jsplitpane 自动调整大小

    我有一个 JSPlitPane 它们之间有 50 的分隔线 这工作正常 但是 当我在右侧添加一些 JLabels 时 jsplitpane 会忽略我的 50 分隔符 左侧窗格会增加其大小 并会挤压右侧窗格 为什么会发生这种情况以及如何解决
  • Struts2 中有多种结果类型?

    我有一个使用 Tiles 的 Struts2 应用程序 如何在操作映射中获取多种结果类型 因为我需要将de输出设置为JSON数据 并且同时Tiles 我努力了

随机推荐

  • Unity 运行FixedUpdate()无响应

    问题 最近在学习unity时 根据 史上最全Unity3D教程 哔哩哔哩 bilibili 在Visual Studio中编写如下代码时 Unity的Console面板并没有输出预期的信息 即按每个固定帧速率的帧调用FixedUpdate
  • Matlab导出动态链接库dll

    1 新建 m文件 内容 function c Add a b c a b end 保存为 Add m 2 命令行输入 gt gt mex setup MEX configured to use Microsoft Visual C 2013
  • 各种注释总结

    jsp注释 html注释
  • C语言—指针

    文章目录 1 指针 1 1 指针的定义 1 2 和 1 3 指针与堆内存 1 4 指针运算 1 5 常量指针与指针常量 1 5 1 常量指针 1 5 2 指针常量 1 6 函数指针 2 指针与数组 3 指针与函数 4 指针与链表 4 1 链
  • cmake中的编译选项

    CMake是一个跨平台的构建系统 它可以根据简单的配置文件生成各种平台的构建工具 例如Makefile Visual Studio项目文件等 CMake使用CMakeLists txt文件来描述项目的构建规则和依赖关系 在这个文件中 可以设
  • 浅谈opencv3.2中各个模块的简介

    3 2版本的模块说明 Opencv3 2模块 首先打开opencv modules hpp文件 可以看到对于各个功能模块的定义如下 This file defines the list of modules available in cur
  • 分享几个项目中用到的设计模式

    前言 之前项目中出于扩展性和有雅性的考虑 使用了多种设计模式进行项目框架的设计 主要的一些设计模式是单例模式 工厂模式 策略模式 责任链模式 代理模式这几种 现在依次讲讲这几个的主要是实现方式和在我们项目中的应用场景 核心设计模式分享 单例
  • WPF TextBlock 实现点击事件

    TextBlock 标签里定义MouseLeftButtonDown 事件 xaml cs
  • ICCV 2023

    ICCV 2023 MPI Flow 从单视角构建的多平面图像中学习光流 引言 主要贡献 Motivation 算法细节 Optical Flow Data Generation Independent Object Motions Dep
  • Node之使用dns模块解析域名

    引 在网络编程中 开发者更倾向于使用域名 而不是IP地址来指定网络连接的目标地址 在Node js中 提供dns模块 以实现域名查找及域名解析的处理 在dns模块中 提供了三个主方法及一系列便捷方法 其中三个主方法分别为用于将一个域名解析为
  • MySQL使用查询结果生成临时表

    MySQL中不支持对同一个表使用其查询结果更新or删除本表内数据 也就是update或delete后的where条件为针对相同表的select 解决方案是创建临时表做过度保存中间数据 可以直接使用查询结果来形成临时表 CREATE TABL
  • verilog奇数分频器的问题讲解(7分频为例)

    先不多哔哔 直接上代码 verilogHDL 代码的后面讲原理 module fenpin3 clk clk7 rst input clk rst 设置rst的目的是当rst 1的时候给cnt0和cnt1赋初值 output clk7 re
  • python sslerror_如何解决“不良握手”问题利用python请求时的SSLErrors

    I m trying to get access to the BambooHR API documentation here but I receive the following error params user username p
  • GREASELM: GRAPH REASONING ENHANCED LANGUAGE MODELS FOR QUESTION ANSWERING

    本文是LLM系列文章 针对 GREASELM GRAPH REASONING ENHANCED LANGUAGE MODELS FOR QUESTION ANSWERING 的翻译 GREASELM 图推理增强的问答语言模型 摘要 1 引言
  • 小霸王其乐无穷之函数回调

    小霸王游戏机是中国上一代备受欢迎的家用游戏机 它在1990年代初期开始流行 当时 由于游戏软件受限 国内的游戏市场相对匮乏 这使得小霸王游戏机成为许多70 80后童年时光中难忘的一部分 小霸王游戏机分为两大主要部分 游戏机本身和卡带 游戏机
  • Python中的CALL_FUNCTION指令

    在Python字节码中 CALL FUNCTION指令后跟的数字代表这次函数调用需要从栈上取出的参数的数量 具体来说 这个数字包括位置参数和关键字参数的数量 这个数字的低两位表示位置参数的数量 然后每两位表示一个关键字参数的数量 因此 如果
  • LLVM Language Reference Manual---阅读笔记

    文档地址 http llvm org docs LangRef html LLVM IR的标示符有两种基本类型 全局的和局部的 全局标示符以 开头 局部标示符以 开头 LLVM IR的标示符有三种形式 命名的 未命名的 常量 每一个Moud
  • Python pyecharts数据可视化

    Python pyecharts数据可视化 绘制精美图表 一 数据可视化 1 pyecharts介绍 2 初入了解 1 快速上手 2 简单的配置项介绍 3 案例实战 1 柱状图Bar 2 地图Map 省份 城市 地区 3 饼图Pie Pie
  • 【SMTP】【POP】电子邮件相关协议分析

    一 实验环境 通过普通路由器连接英特网的计算机一台 通过VMWare安装的Linux虚拟机一台 抓包工具 Wireshark 邮件处理软件 Foxmail 二 实验原理 SMTP工作原理 SMTP提供了一种邮件传输的机制 当收件方和发件方都
  • 公司实战 ElasticSearch+Kafka+Redis+MySQL

    一 需求 前一段时间公司要进行数据转移 将我们ES数据库中的数据转移到客户的服务器上 并且使用定时将新增的数据同步 在这过程中学到了很多 在此记录一下 二 技术栈 Mysql Redis ElasticSearch Kafka 三 方案 为