建模杂谈系列215 ADBS Update V3

2023-11-13

说明

本来在完成这次量化实验之前不想再改版,但想想至少还有3个ADBS要在本次中完成,算下来改所花的时间还是少于因为不改而做的额外配置,所以还是改。

本次升级要解决几个问题:

  • 1 Redis Var的配置化管理
  • 2 Worker 的命令行配置(分发与非分发)

内容

1 Redis Var

目前看来,Redis Var有这么几种角色/功能:

  • 1 通用依赖。例如像时间轴这种的变量,每个worker都要使用。
  • 2 项目定制变量。每个项目都有自己流程的缓存变量,目前有一些项目的缓存还是不正确的,但是只是用于监控,也就放过了。

所以本次的改变是将所有的redis变量都集中在config文件里,有一些只和项目相关,使用上又是固定的变量,就直接用项目名称去格式化就好。

所以会涉及到所有的app和monitor,逐个扫过一遍,确保通用的redis_var会被集中管理。

2 Worker

Worker的分发模式(fetch)有助于各Worker进行并行执行,而顺序取(range)则可以保证每一条数据不会漏掉。

这两种模式区别非常明显,又都会用到,所以把这个参数通过执行命令的参数透到外面的命令行会比较灵活。

具体的执行Worker是逻辑性问题,不需要再修改。

3 操作

  • 1 先创建一个新的文件夹,用于存放新的模板文件
  • 2 以docker run -it 方式打开老的镜像

3.1 configs_base.py

先把最新的配置内容粘贴过来,然后逐个查看每个app中需要使用的redis变量。

# 通用Redis变量 - 默认都是本地redis
global_redis_var = {}
global_redis_var['redis_agent_host'] = redis_agent_host
global_redis_var['redis_connection_hash'] = redis_connection_hash
global_redis_var['some_var'] = None

# 项目用的appRedis变量 - 默认都是本地redis
app_redis_var = {}
app_redis_var['redis_agent_host'] = redis_agent_host
app_redis_var['redis_connection_hash'] = redis_connection_hash
app_redis_var['app01_PullToStep1MongoIn'] = 'BUFF.%s.step1_mongo_in.pf.app.app01_PullToStep1MongoIn.af.gp.0.uf.flow_count' % project_name
app_redis_var['app03_PullToStep1MongoOut'] = 'BUFF.%s.step1_mongo_in.pf.app.app01_PullToStep1MongoIn.af.gp.0.uf.flow_count' % project_name

# 监控用的Redis变量
monitor_redis_var = {}
monitor_redis_var['redis_agent_host'] = redis_agent_host
monitor_redis_var['redis_connection_hash'] = redis_connection_hash
monitor_redis_var['monitor03_Step1TasksClaimed'] ='BUFF.%s.step1_mongo_in.pf.app.monitor03_Step1TasksClaimed.af.gp.0.uf.last_stat' % project_name
monitor_redis_var['monitor04_StreamIn_count'] = 'BUFF.%s.step1_mongo_in.pf.app.app01_PullToStep1MongoIn.af.gp.0.uf.flow_count' % project_name

# 数值型转换(如果声明的变量缺少流程就会中断)
app01_toDoubleVarList = []
app03_toDoubleVarList = []

然后是app需要使用的redis变量,这些应该都是项目相关的固定变量。目前看来,app01是需要redis变量的。

3.2 app01_PullToStep1MongoIn.py

先修改这个。需要使用redis var来记录入系统流量,同时也需要维持数值化变量的任务。如果可以全部是数值的话就不用管。

..
from configs_base import global_redis_var,app_redis_var

..
toDoubleVarList = app01_config['toDoubleVarList']

..

import time 
redis_var =app_redis_var['app01_PullToStep1MongoIn']
redis_buff = req.post(redis_agent_host + 'getv/',json ={'k':redis_var}).json()['data']

3.3 app03_PullToStep1MongoOut.py

app03并不需要提供其他的统计,无需redis变量,但是要增加对于数值型变量的转换功能。

..
from configs_base import global_redis_var,app_redis_var

..
toDoubleVarList = app03_config['toDoubleVarList']

3.4 monitor01_StreamFlow.py

这个monitor主要是看项目默认的输入输出队列,不必改。

3.5 monitor02_DB_Recs.py

这个是看项目相关数据库的总条数,也不用改。

3.6 monitor03_Step1TasksClaimed.py

这个是监控入系统数据被分配的情况。为了减少Mongo的统计压力,采用了增量统计的方式,使用了一个redis变量记录上次的统计点。

这个也属于“项目间差异命名,项目内固定命名”的变量,所以也提出来,放在configs_base里定义,在这个程序体中直接引入就可以。

...

from configs_base import global_redis_var,monitor_redis_var

...

redis_var = monitor_redis_var['monitor03_Step1TasksClaimed']

3.7 monitor04_StreamIn_count.py

这是用于统计入系统流量的,同样需要修改redis变量。monitor04在上个版本已经改过,现在把redis_var的导入集中管理。

3.8 worker01_Go.py

之前会给这个worker01进行不同的命名,发现在多数情况下要定制的部分其实不多,所以可以做一个约定,将这个步骤的调度都命名为 worker01_Go.py,而对应的Worker,不管实现什么功能,都将对应的输出命名为: worker_af, worker_Chain_session_list 以及worker_Chain_session_dict。Worker的名称叫TheWorker.py

启动时,以argparse的方式对其进行调度,默认参数可以维持现状(这样就不必动sche.py)。

argparse的示例如下:

import argparse
# =================  解析参数
def get_arg():
    parser = argparse.ArgumentParser(description='Customized Arguments')
#     parser.add_argument('-p','--pkl', default='Meta')

    parser.add_argument('--is_fetch_mode')

    # 准备解析参数
    args = parser.parse_args()

    is_fetch_mode = args.is_fetch_mode

    return is_fetch_mode

if __name__ =='__main__':
    is_fetch_mode = get_arg() or False
    if is_fetch_mode is not False:
        is_fetch_mode = True 
    print('is_fetch_mode :', is_fetch_mode)

调用:

root@3b2281835e89:/workspace# python3 test.py
is_fetch_mode : False
root@3b2281835e89:/workspace# python3 test.py --is_fetch_mode=xx
is_fetch_mode : True

重写worker.py

import os 

print('>>>work is Running  ')
runcode =''
for some_app in ['worker01_Go.py']:
    runcode += str(os.system('python3 %s' % some_app))
print('>>>work RunCode :%s ' % runcode)

新建worker01_Go.py

...
import argparse
# =================  解析参数
def get_arg():
    parser = argparse.ArgumentParser(description='Customized Arguments')
#     parser.add_argument('-p','--pkl', default='Meta')

    parser.add_argument('--is_fetch_mode')
    parser.add_argument('--worker_name')
    parser.add_argument('--group_name')

    # 准备解析参数
    args = parser.parse_args()

    is_fetch_mode = args.is_fetch_mode
    worker_name = args.worker_name
    group_name = args.group_name

    return is_fetch_mode,worker_name,group_name


is_fetch_mode,worker_name,group_name = get_arg()

# 参数映射1
if is_fetch_mode is not None:
    is_fetch_mode = True 
else:
    is_fetch_mode = False
# 参数映射2
worker_name =worker_name or 'alice'

# 参数映射3
# 在init_projects.py定义
group_name =group_name or 'group1'

...
from TheWorker import worker_af, worker_Chain_session_list, worker_Chain_session_dict

这样worker01就可以固定下来不变了,以后每个项目中嵌入自己的worker就可以,worker中对应的实例、链和字典赋给要导入的变量。

如果worker没有定义会怎么样?

整个流是不会有影响的,使用worker是以os方式调用worker01的,如果worker01失败,主程序并不会出错。

3.9 CNT_worker.py

这类调度主要是在冷启动的时候加速处理,启动一定期限的不间断循环,工作结束时对应的容器也会自动销毁。

import os 

import argparse
# =================  解析参数
def get_arg():
    parser = argparse.ArgumentParser(description='Customized Arguments')
#     parser.add_argument('-p','--pkl', default='Meta')

    parser.add_argument('--cnt_limit')
    parser.add_argument('--is_fetch_mode')
    parser.add_argument('--worker_name')
    parser.add_argument('--group_name')

    # 准备解析参数
    args = parser.parse_args()

    cnt_limit = args.cnt_limit
    is_fetch_mode = args.is_fetch_mode
    worker_name = args.worker_name
    group_name = args.group_name

    return cnt_limit,is_fetch_mode,worker_name,group_name


cnt_limit,is_fetch_mode,worker_name,group_name = get_arg()

cnt_limit = cnt_limit or 10000
cnt_limit = int(cnt_limit)

is_fetch_mode = is_fetch_mode or 'yes'
if is_fetch_mode == 'no':
    is_fetch_mode = ''

worker_name = worker_name or ''
group_name = group_name or ''


aleary_run = 0

print('>>>work is Running  ')
for _ in range(cnt_limit):
    aleary_run+=1
    print('Already Run',aleary_run)
    runcode =''
    for some_app in ['worker01_Go.py --is_fetch_mode=%s --worker_name=%s --group_name=%s ' % (is_fetch_mode,worker_name,group_name) ]:
        runcode += str(os.system('python3 %s' % some_app))
    print('>>>work RunCode :%s ' % runcode)

3.10 Band_worker.py

这类调度主要是为了增加的实时数据的处理,在时间内一直循环,时间到时对应的容器也会自动销毁。Band_worker因为会常态运行,所以加一个参数,让其稍微停顿一下,一般一秒一次。

import os 

import time
def get_time_str1(ts = None,bias_hours=0):
    ts = ts or time.time()
    return time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(ts + bias_hours*3600))

import argparse
# =================  解析参数
def get_arg():
    parser = argparse.ArgumentParser(description='Customized Arguments')
#     parser.add_argument('-p','--pkl', default='Meta')

    parser.add_argument('--to_dt')
    parser.add_argument('--pace')
    parser.add_argument('--is_fetch_mode')
    parser.add_argument('--worker_name')
    parser.add_argument('--group_name')

    # 准备解析参数
    args = parser.parse_args()

    to_dt = args.to_dt
    pace = args.pace
    is_fetch_mode = args.is_fetch_mode
    worker_name = args.worker_name
    group_name = args.group_name

    return to_dt,pace,is_fetch_mode,worker_name,group_name


to_dt,pace,is_fetch_mode,worker_name,group_name = get_arg()

to_dt = to_dt or '2099-01-01 00:00:00'

pace = pace or 10000
pace = int(pace)

is_fetch_mode = is_fetch_mode or 'yes'
if is_fetch_mode == 'no':
    is_fetch_mode = ''

worker_name = worker_name or ''
group_name = group_name or ''



aleary_run = 0

print('>>>work is Running  ')
while True:
    aleary_run+=1
    print('Already Run',aleary_run)
    runcode =''
    for some_app in ['worker01_Go.py --is_fetch_mode=%s --worker_name=%s --group_name=%s ' % (is_fetch_mode,worker_name,group_name) ]:
        runcode += str(os.system('python3 %s' % some_app))
    print('>>>work RunCode :%s ' % runcode)

    if get_time_str1() >= to_dt:
        break
    if pace > 0:
        time.sleep(pace)

4 提交更改,生成新镜像

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

建模杂谈系列215 ADBS Update V3 的相关文章

  • 在 aws-elasticache 上使用 memcached 或 Redis

    我正在 AWS 上开发一个应用程序 并使用 AWS elasticache 进行缓存 我对使用 memcached 或 redis 感到困惑 我阅读了有关 redis 3 0 2 更新以及它现在如何等同于 memchached 的文章 ht
  • 在 sidekiq 上配置 redis 身份验证

    我想我错过了一些东西 因为我在文档中找不到如何编写 redis 实例的用户名和密码以与 sidekiq 一起使用 有没有办法做到这一点 或者是通过 ENV 变量 Sidekiq 将无法识别的 Redis 选项直接传递给 Redis 驱动程序
  • 如何将node.js管道传输到redis?

    我有很多数据要插入 SET INCR 到redis DB 所以我正在寻找pipeline http redis io topics pipelining 质量插入 http redis io topics mass insert通过node
  • Redis Docker compose无法处理RDB格式版本10

    我无法在 docker compose 文件中启动 redis 容器 我知道docker compose文件没问题 因为我的同事可以成功启动项目 我读到有一个删除 dump rdb 文件的解决方案 但我找不到它 我使用Windows机器 任
  • Node Js:Redis 作业在完成其任务后未完成

    希望你们做得很好 我在我的 Nodejs 项目中实现了 BullMQ Bull 的下一个主要版本 来安排发送电子邮件的作业 例如 发送忘记密码请求的电子邮件 所以 我编写了如下所示的代码 用户服务 await resetPasswordJo
  • Scala 使用的 Redis 客户端库建议

    我正在计划使用 Scala 中的 Redis 实例进行一些工作 并正在寻找有关使用哪些客户端库的建议 理想情况下 如果存在一个好的库 我希望有一个为 Scala 而不是 Java 设计的库 但如果现在这是更好的方法 那么仅使用 Java 客
  • Redis 队列工作程序在 utcparse 中崩溃

    我正在尝试按照以下教程获得基本的 rq 工作 https blog miguelgrinberg com post the flask mega tutorial part xxii background jobs https blog m
  • 节点应用程序之间共享会话?

    我目前有两个独立的节点应用程序在两个不同的端口上运行 但共享相同的后端数据存储 我需要在两个应用程序之间共享用户会话 以便当用户通过一个应用程序登录时 他们的会话可用 并且他们似乎已登录到另一个应用程序 在本例中 它是一个面向公众的网站和一
  • 如何使用redis发布/订阅

    目前我正在使用node js和redis来构建应用程序 我使用redis的原因是因为发布 订阅功能 该应用程序只是在用户进入用户或离开房间时通知经理 function publishMsg channel mssage redisClien
  • 如何配置Lettuce Redis集群异步连接池

    我正在配置我的生菜重新分配池 当我按照官方文档配置时 连接池无法正常初始化 无法获取连接 官方文档指出 RedisClusterClient clusterClient RedisClusterClient create RedisURI
  • 超出 Redis 连接/缓冲区大小限制

    在对我们的应用程序服务器进行压力测试时 我们从 Redis 中得到以下异常 ServiceStack Redis RedisException 无法连接到 redis host 6379 处的 redis 实例 gt System Net
  • 使用环境变量在 redis.conf 中设置动态路径

    我有一个环境变量MY HOME其中有一个目录的路径 home abc 现在 我有一个redis conf文件 我需要像这样设置这个路径 redis conf pidfile MY HOME local var pids redis pid
  • Redis 中存储整数和字符串的区别

    这两个命令有什么区别吗 LPUSH myset 123 LPUSH myset 123 我想存储大约 500 万个整数 并且我想以最有效的方式做到这一点 不 没有什么区别 两者都存储为字符串 从redis io http redis io
  • Redis 在键过期时更新排序集

    我有一个 Redis 服务器 其中包含一组键值对和一个排序集 提供这些键值对的键的索引 键值对可以进入 已完成 状态 此时需要在 1 小时后删除它们 这可以通过在键上设置到期时间来简单地实现 但从排序集中清除它们似乎更成问题 我可以有一个过
  • 集合成员的 TTL

    Redis 是否可以不为特定键而是为集合的成员设置 TTL 生存时间 我正在使用 Redis 文档提出的标签结构 数据是简单的键值对 标签是包含与每个标签对应的键的集合 例如 gt SETEX id id 1 100 Lorem ipsum
  • 使用 Redis 中的键

    我是 Redis 和键值数据库的新手 你能告诉我如何在redis中正确实现这种关系方法吗 我有一个关系表 其中两个键对应一个值 master id slave id 价值 Example 主站 ID 从属ID 价值 1 1 值1 2 1 值
  • 在redis中存储多个嵌套对象

    我想在redis中存储多个复杂的json数据 但不知道如何 这是我的 json 结构 users user01 username ally email email protected cdn cgi l email protection u
  • Redis 是否使用用户名进行身份验证?

    我已经在我的环境中设置了Redis 并且只看到了通过密码授权的部分 有没有办法也设置用户名 还是只能通过密码验证 Redis 6 上有 ACL 这些都有一个用户名 查看https redis io topics acl https redi
  • 检查 Redis 列表中是否已存在某个值

    我想知道是否有办法检查 redis 列表中是否已存在某个键 我无法使用集合 因为我不想强制唯一性 但我确实希望能够检查字符串是否确实存在 Thanks 您的选择如下 Using LREM如果发现则更换它 维护一个单独的SET与您的LIST
  • 如何高效地将数十亿数据插入Redis?

    我有大约 20 亿个键值对 我想将它们有效地加载到 Redis 中 我目前正在使用 Python 并使用 Pipe 如redis py https redis py readthedocs io en latest redis Redis

随机推荐

  • 取消GL.iNet路由器视频的密码

    每次路由器访问192 168 8 1 8083 action stream时总是无法访问 但是先进入192 168 8 1登录以后再去刷新视频就可以出来 即使取消外网登录验证也还是没效果 最后发现广大网友的意见是重新刷固件 先去GL iNe
  • 计算机专业毕业设计演示视频(论文+系统)_kaic

    https gongkailuxiangdu oss cn beijing aliyuncs com lx jsp 20 70912jspm E6 88 BF E5 B1 8B E9 94 80 E5 94 AE E7 AE A1 E7 9
  • 树莓派命令行显示乱码及异地组网问题

    写了一千多字没保存 很生气 这一条简写 命令行显示异常 首先检查树莓派设置里的地区 时区设置 一律改为中国 随后重要原因就是字库不全问题 命令行输入 sudo apt get install ttf way zenhei 一路确定安装字体
  • 程序的链接的三种方式

    程序的链接有以下三种方式 静态链接 在程序运行之前 先将各目标模块及它们所需的库函数链接成一个完整的可执行程序 以后不再拆开 装入时动态链接 将用户源程序编译后所得到的一组目标模块 在装入内存时 釆用边装入边链接的链接方式 运行时动态链接
  • 使用matlab进行灵敏性分析(附源代码)

    调用单纯形程序 function x z flg sgma simplexfun A A1 b c m n n1 cb xx A b are the matric in Ax b c is the matrix in max z cx A1
  • ChatGPT实现代码生成

    代码生成 就代码生成而言 ChatGPT 是一款卓越的工具 它为开发者提供强大的功能 ChatGPT 可以运用其出色的自然语言处理技术 深入理解和解释开发者的需求 快速生成适合的代码片段 对于那些繁琐的任务或者重复的代码 ChatGPT 能
  • 试题 C: 刷题统计

    题目链接 点击跳转 题目描述 小明决定从下周一开始努力刷题准备蓝桥杯竞赛 他计划周一至周五每天做 a 道题目 周六和周日每天做 b 道题目 请你帮小明计算 按照计划他将在第几天实现做题数大于等于 n 题 输入格式 输入一行包含三个整数 a
  • 系统资源占用高排查手段

    1 cpu高排查思路 1 top d 1每秒打印进程所占cpu资源 然后再按h显示线程占用 2 strace跟踪strace p 线程号 会打印该线程主要做什么操作 2 io高排查思路 lsof是一个展现的是当前系统所有进程 不是线程 打开
  • 端午过后公司面了一个字节来的要求月薪23K,明显感觉他背了很多面试题...

    最近有朋友去字节面试 面试前后进行了20天左右 包含4轮电话面试 1轮笔试 1轮主管视频面试 1轮hr视频面试 据他所说 80 的人都会栽在第一轮面试 要不是他面试前做足准备 估计都坚持不完后面几轮面试 其实 第一轮的电话面试除了一些常规的
  • Redis数据结构——QuickList、SkipList、RedisObjective

    承接上文 本文主要介绍QuickList SkipList RedisObjective 四 Redis数据结构 QuickList 问题1 ZipList虽然节省内存 但申请内存必须是连续空间 如果内存占用较多 申请内存效率很低 怎么办
  • ObjectArx 学习笔记(一)--入口函数acrxEntryPoint

    参考资料 AutoCAD 2000 ARX二次开发实例精粹 1 Arx程序的初始化 新建完工程之后 Arx程序的初始化在acrxEntryPoint 函数的AcRx kInitAppMsg事件中 或该事件调用的函数中进行 例如InitApp
  • 【PS】高低频磨皮

    一 原理 将皮肤纹理的信息储存在高频的图层中 将皮肤颜色的信息储存在低频的图层中 从而分开皮肤的颜色和纹理 达到快速修复皮肤的效果 二 步骤 1 建立高低频图层 2 低频图层 3 高频图层 图像 应用图像 混合模式改为线性光
  • 以http协议实现onvif协议并完成对IPC摄像头的监控

    文章目录 目录 文章目录 前言 1实现http连接 2 获取设备编码参数 3 设置摄像头相关参数 总结 前言 因为工作上的原因 需要接入IPC摄像头 实现监控功能 因而开始了对于IPC摄像头的学习之路 因为要做到通用 所以目光直接锁定了on
  • python爬虫增加多线程获取数据

    Python爬虫应用领域广泛 并且在数据爬取领域处于霸主位置 并且拥有很多性能好的框架 像Scrapy Request BeautifuSoap urlib等框架可以实现爬行自如的功能 只要有能爬取的数据 Python爬虫均可实现 数据信息
  • 国产数据库产品清单

    01 提到国产数据库 圈儿内的朋友多数会说出国产数据库 四大家族 达梦 金仓 南大 神通 那么除了这四家 你还是否还了解其他的国产数据库产品 随着国内信息技术的快速发展 以及近几年去 O 的强势浪潮 在国内各数据库厂商的不断努力下 国产数据
  • 区块链实验室(14) - 编译FISCO-BCOS

    FISCO BCOS是一种区块链平台 与Hyperledger和Ethereum有些不同 详见FISCO BCOS 区块链 编译FISCO BCOS源码的目的是修改或者新增其中功能模块 进行对比实验 验证新想法 新创意的效果 编译的步骤很简
  • SQLi-Labs 学习笔记(Less 41-50)

    点击打开链接 Less 41 基于错误的POST型单引号字符型注入 先打开网页查看 Welcome Dhakkan 与之前讲的Less 40的区别 plain view plain copy sql SELECT FROM users WH
  • WPF性能优化经验总结

    原文地址 WPF性能优化经验总结 痴鸟 博客园 WPF性能优化一 Rendering Tier 1 根据硬件配置的不同 WPF采用不同的Rendering Tier做渲染 下列情况请特别注意 因为在这些情况下 即使是处于Rendering
  • 服务器添加网卡

    原因 因为网络原因服务器需要添加网卡 1 确定主板卡槽 是否可以添加网卡 2 命令ip a 查看现有网卡 3 命令 cd etc sysconfig network scripts 查看文件列表 enpls0 网卡对应文件ifcfg enp
  • 建模杂谈系列215 ADBS Update V3

    说明 本来在完成这次量化实验之前不想再改版 但想想至少还有3个ADBS要在本次中完成 算下来改所花的时间还是少于因为不改而做的额外配置 所以还是改 本次升级要解决几个问题 1 Redis Var的配置化管理 2 Worker 的命令行配置