构建实时数据可视化监控的全栈实现(Kafka+Spark+TimescaleDB+Flask+Node.js)

2023-10-29

因为项目需求,需要构建一个实时的数据监控系统,把平台上报的业务数据以1分钟的粒度进行呈现。为此我构建了以下的一个架构来实现。

平台上报的业务数据会实时的发送消息给Kafka,例如平台每次为车辆进行OTA升级时,会发送一个OTA业务请求的事件,一个OTA业务完成或失败的事件。这些事件会发送到Kafka,然后Spark Streaming会进行实时的数据处理并保存到时序数据库。前端的WEB 报表平台会每一分钟调用后端提供的RESTAPI来读取数据库,进行报表的刷新。

Kafka消息系统的搭建

首先是搭建Kafka,我下载的是kafka 2.12版本。按照官网的介绍先启动zookeeper: bin/zookeeper-server-start.sh config/zookeeper.properties,然后启动kafka: bin/kafka-server-start.sh config/server.properties

编写一个producer程序模拟发送OTA的请求事件,以及OTA执行成功或失败的事件,如以下代码:

from kafka import KafkaProducer
import datetime
import time
import random

producer = KafkaProducer(bootstrap_servers='localhost:9092')
for i in range(100):
    ts = datetime.datetime.now().isoformat()
    msg = ts+','+'OTA request'
    producer.send('OTA',msg.encode('utf8'))
    time.sleep(0.5)
    flag = random.randint(1,10)
    ts = datetime.datetime.now().isoformat()
    if flag>2:
        msg = ts+','+'OTA complete'
    else:
        msg = ts+','+'OTA failure'
    producer.send('OTA',msg.encode('utf8'))

编写一个consumer程序订阅OTA这个主题,接收事件:

from kafka import KafkaConsumer
 
consumer = KafkaConsumer('OTA')
for msg in consumer:
    print((msg.value).decode('utf8'))

运行producer和consumer,可以看到能正常发送和接收事件。

创建时序数据库

因为数据是按照时间顺序实时上报的,因此采用时序数据库来进行数据的存放和后期的读取是最有效的。TimescaleDB是一个开源的时序数据库,可以作为postgres的插件运行。具体如何使用可以上官网https://docs.timescale.com/了解,官网上还有很好的一些教程,对纽约的出租车的出行情况进行数据分析。安装好timescaleDB之后,我们就可以在postgres上来创建一个数据库了。以下是创建一个OTA数据库,里面定义了一张ota的数据表,包括了2个字段时间戳和业务类型

create database ota;
\c ota
CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;
CREATE TABLE "ota" (ts TIMESTAMPTZ, serviceType text);

SPARK Streaming处理实时数据

接下来就是用一个实时数据处理平台来订阅Kafka的数据并实时写入到时序数据库了。这里我用的是SPARK streaming。先定义一个获取数据库连接池的程序connection_pool.py

import psycopg2
from psycopg2.pool import SimpleConnectionPool

conn_pool = SimpleConnectionPool(1,10,"dbname=ota user=postgres password=XXXXXX")

def getConnection():
    return conn_pool.getconn()

def putConnection(conn):
    conn_pool.putconn(conn)

def closeConnection():
    conn_pool.closeall()

实时处理Kafka数据并写入数据库sparkstream.py

from kafka import KafkaProducer
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark import SparkConf, SparkContext
import connection_pool
offsetRanges = [] 

def start():  
    sconf=SparkConf()  
    sconf.set('spark.cores.max',3)  
    sc=SparkContext(appName='OTAStream',conf=sconf)  
    ssc=StreamingContext(sc,5)  
    brokers ="localhost:9092"  
    topic='OTA'  
    start = 0  
    partition = 0  
    ota_data = KafkaUtils.createDirectStream(
        ssc,
        [topic],
        kafkaParams={"metadata.broker.list":brokers},
        fromOffsets={TopicAndPartition(topic,partition):start}
    )  
    ota_data.foreachRDD(offset)
    ota_fields = ota_data.map(lambda x:x[1].split(','))
    ota_fields.foreachRDD(lambda rdd: rdd.foreachPartition(echo))
    ssc.start()  
    ssc.awaitTermination(15)  
    connection_pool.closeConnection()
 
def offset(rdd):  
    global offsetRanges  
    offsetRanges = rdd.offsetRanges()  

def echo(recordOfPartition):  
    conn = connection_pool.getConnection()
    cursor = conn.cursor()
    for record in recordOfPartition:
        sql = "insert into ota values('%s', '%s')" %(record[0], record[1])
        cursor.execute(sql)
    conn.commit()
    connection_pool.putConnection(conn)

if __name__ == '__main__':  
    start()

在SPARK的目录下运行以下命令来提交任务:bin/spark-submit --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.4.5.jar --py-files ~/projects/monitor/connection_pool.py ~/projects/monitor/sparkstream.py,同时启动Kafka的producer程序,之后查询ota数据库,可以看到里面会有相应的数据产生。

定义后端RESTAPI查询数据库

后端需要提供数据给前端,这里我用Flask来创建一个RESTAPI,代码如下:

from flask import make_response, Flask
from flask_cors import CORS
import psycopg2
import json
import math
conn = psycopg2.connect("dbname=ota user=postgres password=123456")
cursor = conn.cursor()
sql = "select a.five_sec, a.cnt as complete, b.cnt as failure, cast(a.cnt as float)/(a.cnt+b.cnt) as percent from " + \
    "(SELECT time_bucket('5 second', time) AS five_sec, count(*) as cnt FROM ota " + \
    "WHERE servicetype='OTA complete' GROUP BY five_sec ORDER BY five_sec) a " + \
    "full join " + \
    "(SELECT time_bucket('5 second', time) AS five_sec, count(*) as cnt FROM ota " + \
    "WHERE servicetype='OTA failure' GROUP BY five_sec ORDER BY five_sec) b " + \
    "on a.five_sec=b.five_sec ORDER BY a.five_sec DESC LIMIT 10;"
app = Flask(__name__)
CORS(app, supports_credentials=True)
@app.route('/ota')
def get_ota():
    cursor.execute(sql)
    timebucket = []
    complete_cnt = []
    failure_cnt = []
    complete_rate = []
    records = cursor.fetchall()
    for record in records:
        #timebucket.append(record[0].strftime('%Y-%m-%d %H:%M:%S'))
        timebucket.append(record[0].strftime('%H:%M:%S'))
        complete_cnt.append(0 if record[1]==None else record[1])
        failure_cnt.append(0 if record[2]==None else record[2])
        if record[1]==None:
            rate = 0.
        elif record[2]==None:
            rate = 100.
        else:
            rate = round(record[3],3)*100
        complete_rate.append(rate)
    timebucket = list(reversed(timebucket))
    complete_cnt = list(reversed(complete_cnt))
    failure_cnt = list(reversed(failure_cnt))
    complete_rate = list(reversed(complete_rate))
    result = {'category':timebucket,'complete':complete_cnt,'failure':failure_cnt,'rate':complete_rate}
    response = make_response(json.dumps(result))
    return response

以上代码中可以见到,采用时序数据库,可以很方便的对数据按时间顺序进行分桶查询,例如我以上的代码是对数据按照每5秒的间隔统计一次,计算每5秒内的OTA业务完成的次数和失败的次数,并计算完成率。最后把结果以JSON方式返回。

前端报表监控

最后一部分就是在前端进行报表展现,这里采用的是ECHARTS,这是百度开源的一个报表Javascript模块。我用Node.js来搭建前端的界面。

在命令行输入以下命令:

mkdir react-monitor
npm init -y
npm -i webpack webpack-cli -D
npm -i -D babel-core babel-loader@7 babel-preset-env babel-preset-react

编辑webpack.config.js文件:

var webpack = require('webpack');
var path = require('path');
const {CleanWebpackPlugin} = require("clean-webpack-plugin");
var APP_DIR = path.resolve(__dirname, 'src');
var BUILD_DIR = path.resolve(__dirname, 'dist');
const HtmlWebpackPlugin = require("html-webpack-plugin");
var config = {
    entry:APP_DIR+'/index.jsx',
    output:{
        path:BUILD_DIR,
        filename:'bundle.js'
    },
    module:{
        rules:[
            {
                test:/\.(js|jsx)$/,
                exclude:/node_modules/,
                use:{
                    loader:"babel-loader"
                }
            },
            {
                test:/\.css$/,
                loader:'style-loader!css-loader'
            }
        ]
    },
    devServer:{
        port:3000,
        contentBase:"./dist"
    },
    plugins:[
        new HtmlWebpackPlugin({
            template: "index.html",
            inject: true,
            sourceMap: true,
            chunksSortMode: "dependency"
        }),
        new CleanWebpackPlugin()
    ]
};
module.exports = config;

创建.babelrc文件,如以下配置:

{
    "presets": ["env","react"],
    "plugins": [[
        "transform-runtime",
        {
          "helpers": false,
          "polyfill": false,
          "regenerator": true,
          "moduleName": "babel-runtime"
        }
    ]]
}

在命令行中输入以下命令,安装NPM的包

npm install react react-dom -S
npm install html-webpack-plugin clean-webpack-plugin -D
npm install axios --save
npm install echarts --save

创建一个index.html页面,用于放置图表:

<!DOCTYPE html>
<html lang="en">
    <head>
        <meta charset="UTF-8">
        <meta name="viewport" content="width=device-width,initial-scale=1.0">
        <meta http-equiv="X-UA-Compatible" content="ie=edge">
        <title>hello world</title>
    </head>
    <body>
        <div id="chart" style="height:500px;width:1000px"></div>
    </body>
</html>

在src目录下创建一个index.jsx文件,用于创建react component封装echarts:

import React from 'react';
import {render} from 'react-dom';
import echarts from 'echarts';
import axios from 'axios';

let option = {
    tooltip: {
        trigger: 'axis',
        axisPointer: {
            type: 'cross',
            crossStyle: {
                color: '#999'
            }
        }
    },
    toolbox: {
        feature: {
            dataView: {show: true, readOnly: false},
            magicType: {show: true, type: ['line', 'bar']},
            restore: {show: true},
            saveAsImage: {show: true}
        }
    },
    legend: {
        data: ['OTA Complete', 'OTA Failure', 'OTA Complete Rate']
    },
    xAxis: [
        {
            type: 'category',
            data: [],
            axisPointer: {
                type: 'shadow'
            },
            axisLabel: {
                interval: 0,
                rotate: 60
            }
        }
    ],
    yAxis: [
        {
            type: 'value',
            name: 'Count',
            min: 0,
            max: 20,
            interval: 5,
            axisLabel: {
                formatter: '{value}'
            }
        },
        {
            type: 'value',
            name: 'Percent',
            min: 0,
            max: 100,
            interval: 10,
            axisLabel: {
                formatter: '{value} %'
            }
        }
    ],
    series: [
        {
            name: 'OTA Complete',
            type: 'bar',
            data: []
        },
        {
            name: 'OTA Failure',
            type: 'bar',
            data: []
        },
        {
            name: 'OTA Complete Rate',
            type: 'line',
            yAxisIndex: 1,
            data: []
        }
    ]
};

class App extends React.Component{
    constructor(props){
        super(props);
        this.state={
            category:[],
            series_data_1:[],
            series_data_2:[],
            series_data_3:[]
        };
    }
    async componentDidMount () {
        let myChart = echarts.init(document.getElementById('chart'));
        await axios.get('http://localhost:5000/ota')
        .then(function (response){
            option.xAxis[0].data = response.data.category;
            option.series[0].data = response.data.complete;
            option.series[1].data = response.data.failure;
            option.series[2].data = response.data.rate;
        })
        .catch(function (error) {
            console.log(error);
        });
        myChart.setOption(option,true)
        this.state.timer=setInterval(async ()=>{
            await axios.get('http://localhost:5000/ota')
            .then(function (response){
                option.xAxis[0].data = response.data.category;
                option.series[0].data = response.data.complete;
                option.series[1].data = response.data.failure;
                option.series[2].data = response.data.rate;
            })
            .catch(function (error) {
                console.log(error);
            });
            myChart.setOption(option,true);
        }, 1000*2)
    }
    async getData(){
        return await axios.get('http://localhost:5000/ota');
    }
    render(){
        return 'abc'
    }
    componentWillUnmount() {
        clearInterval(this.interval);
    }
}
render(<App/>, document.getElementById('chart'));

运行效果

现在可以检验一下效果了,按照以下步骤执行:

  1. 开启Kafka, 往OTA topic发布测试数据
  2. 提交SPARK任务,实时处理数据并写入timescaleDB
  3. 运行Flask,执行命令export FLASK_APP=flaskapi.py, flask run
  4. 在React项目中执行npm run start
  5. 打开浏览器,输入localhost:3000,即可看到报表每2秒更新一次,如以下界面

 

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

构建实时数据可视化监控的全栈实现(Kafka+Spark+TimescaleDB+Flask+Node.js) 的相关文章

随机推荐

  • 完美的Apache静态.htaccess文件 [discuz和home带301重定向]

    完美的Apache静态 htaccess文件 discuz和home带301重定向 本帖最后由 下砂 于 2009 11 13 10 32 编辑 先后修改过三次 加了301重定向 顶级域名和论坛二级域名 后rewrite base保持 状态
  • 无法加载训练好的.h5权重

    Unable to open file unable to open file name h5 errno 2 error message No such file or directory 确认模型是正确的情况下 最好的办法是升级h5py
  • AD中如何做出爱心❤的丝印

    不知道有没有小伙伴试过在AD中做一个爱心的丝印 今天在这里就跟大家分享一下 如何做一个带爱心 的丝印 具体方法就是我们在字符串输入框中输入相应的内容和 爱心 的话可以通过搜狗输入法打一个 心 字就会弹出哦 选择字体 MS UI Gothic
  • 《逻辑与计算机设计基础(原书第5版)》——2.9 硬件描述语言—VHDL

    2 9 硬件描述语言 VHDL 由于硬件描述语言用来描述和设计硬件 故在使用该语言编程时 应牢记底层的硬件实现 特别是当你的设计将用来综合时 例如 如果忽略将要生成的硬件 那么你可能会用低效的硬件描述语言设计出一个大且复杂的门级结构 而实际
  • Harmony Codelab 样例—弹窗基本使用

    一 介绍 本篇 Codelab 主要基于 dialog 和 button 组件 实现弹窗的几种自定义效果 具体效果有 1 警告弹窗 点击确认按钮弹窗关闭 2 确认弹窗 点击取消按钮或确认按钮 触发对应操作 3 加载弹窗 展示加载中效果 4
  • 我与西门子的面试全过程(一面+二面)_2008校园招聘_笔试与面试分享_UNUS.CN

    导读 很庆幸自己能够参加世界500强企业 西门子的面试 经过了一面和二面 虽然最后没有被录取 但高兴的是我们广工有两个进了 在这里祝福他们 今天终于有空 写下自己与西门子的全过程 写下这篇文章 并不是想炫耀 而是真心希望对后来者有帮助 在学
  • 【LeetCode】426. 将二叉搜索树转化为排序的双向链表(剑指 Offer 36)

    一 题目 将一个 二叉搜索树 就地转化为一个 已排序的双向循环链表 对于双向循环列表 你可以将左右孩子指针作为双向循环链表的前驱和后继指针 第一个节点的前驱是最后一个节点 最后一个节点的后继是第一个节点 特别地 我们希望可以 就地 完成转换
  • 【PaddlePaddle】 mnist手写数字识别(卷积神经网络)

    这篇文章主要讲解了卷积神经网络的使用 卷积神经网络可以用来提取图像特征 所以在计算机视觉上有很好的效果 系统 ubuntu18 04 python版本 python2 7 目录 训练模型 进行预测 完整代码 训练模型 先把需要用到的模块导入
  • 【日常总结】c++静态成员为啥要在类外进行初始化

    解释 类的静态成员变量内存不属于实例化的类 在类内只起到申明的作用 必须要在类外进行初始化 这个说法不严谨 类外主要是进行定义 分配内存 同时也可以赋初始值 代码例子 test h pragma once include
  • 【云原生之Docker实战】使用Docker部署Wizard文档管理系统

    云原生之Docker实战 使用Docker部署Wizard文档管理系统 一 Wizard介绍 1 Wizard简介 2 Wizard特点 二 检查宿主机系统版本 三 检查本地docker环境 1 检查docker服务状态 2 检查docke
  • 你看鱿鱼这么便宜,所以是不是很可怜?

    本文非技术分享 可能属于逻辑思考 再一次做梦 如下片段 有个朋友聊天问我 你看鱿鱼 那么便宜 是不是很可怜 我的内心 贵或者不贵 从哪能体现出它可怜不可怜呢 这逻辑有问题 我的回答 有点 又一次从梦中醒来 立马记录下做了什么梦 仅此而已 好
  • Android8.0、9.0安装包解析失败

    根据google官网得知 在8 0以上权限控制的更加严格 应用内安装下载更新的apk都需要申请 安装外面应用 权限才能去安装新应用 如果没有申请否则无法安装 顺便附上6 0 7 0设备解决方案 一 设备6 0
  • MDK 5.10 -- Reading one or more Pack descriptions failed

    MDK 5 10 的Pack Install 提示如下错误 解决办法 1 去掉 C Keil v5 ARM Pack Keil STM32L0xx DFP 1 5 0 Keil STM32L0xx DFP pdsc 文件的只读属性 2 用M
  • canvas生成自定义大小图片

    场景 比如移动端签名 一张canvas画布 在任意位置书写之后 生成一张图片 如果这种图片要放到某一个签名的位置会显的特别大 我们来解决这个问题 一 生成canvas图片 通过 canvas toDataURL image png 1 生成
  • Three.js文件及其插件链接

    Three js master包下载 由于官网three js master文件下载非常缓慢甚至经常下载失败 为了广大WebGL程序员的方便 博主专门下载下来放在百度网盘中分享给大家 百度网盘链接 链接 百度网盘 请输入提取码 提取码 0j
  • SQL批量删除数据操作

    SQL批量删除数据操作 文章目录 SQL批量删除数据操作 sql语句 DELETE和TRUNCATE区别 sql语句 删除数据 避免这么写 删除表全部数据 DELETE FROM student 删除指定数据 DELETE FROM stu
  • IOS 解决安装POD报You don't have write permissions for the /usr/bin directory的错误

    这段时间开始做IOS开发 使用pod管理第三方库 由于一些第三方不兼容最新的pod 所以要安装旧版本的pod 其中遇到的问题就是 You don t have write permissions for the usr bin direct
  • 学前steam教育范围

    近几年什么教育趋势席卷全球 发展势头如火如荼 相信很多人立刻会想到STEAM教育 该教育最早由美国提出 一直备受瞩目 STEAM教育的核心理念是强调学科之间的联系 以整合的形式进行教育 格物斯坦表示学前教育阶段的孩子学习STEAM教育对今后
  • Spring application context not configured for this file

    出现这个意思是新建的Spring配置文件没有被加入到spring里面 我是这样理解的 简单几步搞定 选择编辑器左上角file gt Project Structure 然后 最后别忘Apply OK
  • 构建实时数据可视化监控的全栈实现(Kafka+Spark+TimescaleDB+Flask+Node.js)

    因为项目需求 需要构建一个实时的数据监控系统 把平台上报的业务数据以1分钟的粒度进行呈现 为此我构建了以下的一个架构来实现 平台上报的业务数据会实时的发送消息给Kafka 例如平台每次为车辆进行OTA升级时 会发送一个OTA业务请求的事件