Python响应式类库RxPy简介

2023-10-29

RxPy是非常流行的响应式框架Reactive X的Python版本,其实这些版本都是一样的,只不过是各个语言的实现不同而已。因此,如果学会了其中一种,那么使用其他的响应式版本也是轻而易举的。之前我就听说过这个框架,最近决定好好研究一下。

基本概念

Reactive X中有几个核心的概念,先来简单介绍一下。

Observable和Observer(可观察对象和观察者)

首先是Observable和Observer,它们分别是可观察对象和观察者。Observable可以理解为一个异步的数据源,会发送一系列的值。Observer则类似于消费者,需要先订阅Observable,然后才可以接收到其发射的值。可以说这组概念是设计模式中的观察者模式和生产者-消费者模式的综合体。

Operator(操作符)

另外一个非常重要的概念就是操作符了。操作符作用于Observable的数据流上,可以对其施加各种各样的操作。更重要的是,操作符还可以链式组合起来。这样的链式函数调用不仅将数据和操作分隔开来,而且代码更加清晰可读。一旦熟练掌握之后,你就会爱上这种感觉的。

Single(单例)

在RxJava和其变体中,还有一个比较特殊的概念叫做Single,它是一种只会发射同一个值的Observable,说白了就是单例。当然如果你对Java等语言比较熟悉,那么单例想必也很熟悉。

Subject(主体)

主体这个概念非常特殊,它既是Observable又是Observer。正是因为这个特点,所以Subject可以订阅其他Observable,也可以将发射对象给其他Observer。在某些场景中,Subject会有很大的作用。

Scheduler(调度器)

默认情况下Reactive X只运行在当前线程下,但是如果有需要的话,也可以用调度器来让Reactive X运行在多线程环境下。有很多调度器和对应的操作符,可以处理多线程场景下的各种要求。

Observer和Observable

先来看看一个最简单的例子,运行的结果会依次打印这些数字。这里的of是一个操作符,可以根据给定的参数创建一个新的Observable。创建之后,就可以订阅Observable,三个回调方法在对应的时机执行。一旦Observer订阅了Observable,就会接收到后续Observable发射的各项值。

from rx import of

ob = of(1, 2, 34, 5, 6, 7, 7)
ob.subscribe(
    on_next=lambda i: print(f'Received: {i}'),
    on_error=lambda e: print(f'Error: {e}'),
    on_completed=lambda: print('Completed')

)

这个例子看起来好像很简单,并且看起来没什么用。但是当你了解了Rx的一些核心概念,就会理解到这是一个多么强大的工具。更重要的是,Observable生成数据和订阅的过程是异步的,如果你熟悉的话,就可以利用这个特性做很多事情。

操作符

在RxPy中另一个非常重要的概念就是操作符了,甚至可以说操作符就是最重要的一个概念了。几乎所有的功能都可以通过组合各个操作符来实现。熟练掌握操作符就是学好RxPy的关键了。操作符之间也可以用pipe函数连接起来,构成复杂的操作链。

from rx import of, operators as op
import rx

ob = of(1, 2, 34, 5, 6, 7, 7)
ob.pipe(
    op.map(lambda i: i ** 2),
    op.filter(lambda i: i >= 10)
).subscribe(lambda i: print(f'Received: {i}'))

在RxPy中有大量操作符,可以完成各种各样的功能。我们来简单看看其中一些常用的操作符。如果你熟悉Java8的流类库或者其他函数式编程类库的话,应该对这些操作符感到非常亲切。

创建型操作符

首先是创建Observable的操作符,列举了一些比较常用的创建型操作符。

操作符 作用
just(n) 只包含1个值的Observable
repeated_value(v,n) 重复n次值为v的Observable
of(a,b,c,d) 包含所有参数的Observable
empty() 一个空的Observable
from_iterable(iter) 用iterable创建一个Observable
generate(0, lambda x: x < 10, lambda x: x + 1) 用初始值和循环条件生成Observable
interval(n) 以n秒为间隔定时发送整数序列的Observable

过滤型操作符

过滤型操作符的主要作用是对Observable进行筛选和过滤。

操作符 作用
debounce 按时间间隔过滤,在范围内的值会被忽略
distinct 忽略重复的值
elementAt 只发射第n位的值
filter 按条件过滤值
first/last 发射首/尾值
skip 跳过前n个值
take 只取前n个值

转换型操作符

操作符 作用
flatMap 转换多个Observable的值并将它们合并为一个Observable
groupBy 对值进行分组,返回多个Observable
map 将Observable映射为另一个Observable
scan 将函数应用到Observable的每个值上,然后返回后面的值

算术操作符

操作符 作用
average 平均数
count 个数
max 最大值
min 最小值
reduce 将函数应用到每个值上,然后返回最终的计算结果
sum 求和

Subject

Subject是一种特殊的对象,它既是Observer又是Observable。不过这个对象一般不太常用,但是假如某些用途还是很有用的。所以还是要介绍一下。下面的代码,因为订阅的时候第一个值已经发射出去了,所以只会打印订阅之后才发射的值。

from rx.subject import Subject, AsyncSubject, BehaviorSubject, ReplaySubject

# Subject同时是Observer和Observable

print('--------Subject---------')
subject = Subject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4

另外还有几个特殊的Subject,下面来介绍一下。

ReplaySubject

ReplaySubject是一个特殊的Subject,它会记录所有发射过的值,不论什么时候订阅的。所以它可以用来当做缓存来使用。ReplaySubject还可以接受一个bufferSize参数,指定可以缓存的最近数据数,默认情况下是全部。

下面的代码和上面的代码几乎完全一样,但是因为使用了ReplaySubject,所以所有的值都会被打印。当然大家也可以试试把订阅语句放到其他位置,看看输出是否会产生变化。

# ReplaySubject会缓存所有值,如果指定参数的话只会缓存最近的几个值
print('--------ReplaySubject---------')
subject = ReplaySubject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 1 2 3 4

BehaviorSubject

BehaviorSubject是一个特殊的Subject,它只会记录最近一次发射的值。而且在创建它的时候,必须指定一个初始值,所有订阅它的对象都可以接收到这个初始值。当然如果订阅的晚了,这个初始值同样会被后面发射的值覆盖,这一点要注意。

# BehaviorSubject会缓存上次发射的值,除非Observable已经关闭
print('--------BehaviorSubject---------')
subject = BehaviorSubject(0)
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4

AsyncSubject

AsyncSubject是一个特殊的Subject,顾名思义它是一个异步的Subject,它只会在Observer完成的时候发射数据,而且只会发射最后一个数据。因此下面的代码仅仅会输出4.假如注释掉最后一行co_completed调用,那么什么也不会输出。

# AsyncSubject会缓存上次发射的值,而且仅会在Observable关闭后开始发射
print('--------AsyncSubject---------')
subject = AsyncSubject()
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 4

Scheduler

虽然RxPy算是异步的框架,但是其实它默认还是运行在单个线程之上的,因此如果使用了某些会阻碍线程运行的操作,那么程序就会卡死。当然针对这些情况,我们就可以使用其他的Scheduler来调度任务,保证程序能够高效运行。

下面的例子创建了一个ThreadPoolScheduler,它是基于线程池的调度器。两个Observable用subscribe_on方法指定了调度器,因此它们会使用不同的线程来工作。

import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as op

import multiprocessing
import time
import threading
import random


def long_work(value):
    time.sleep(random.randint(5, 20) / 10)
    return value


pool_schedular = ThreadPoolScheduler(multiprocessing.cpu_count())

rx.range(5).pipe(
    op.map(lambda i: long_work(i + 1)),
    op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 1: {threading.current_thread().name}, {i}'))

rx.of(1, 2, 3, 4, 5).pipe(
    op.map(lambda i: i * 2),
    op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 2: {threading.current_thread().name}, {i}'))

如果你观察过各个操作符的API的话,可以发现大部分操作符都支持可选的Scheduler参数,为操作符指定一个调度器。如果操作符上指定了调度器的话,会优先使用这个调度器;其次的话,会使用subscribe方法上指定的调度器;如果以上都没有指定的话,就会使用默认的调度器。

应用场景

好了,介绍了一些Reactive X的知识之后,下面来看看如何来使用Reactive X。在很多应用场景下,都可以利用Reactive X来抽象数据处理,把概念简单化。

防止重复发送

很多情况下我们都需要控制事件的发生间隔,比如有一个按钮不小心按了好几次,只希望第一次按钮生效。这种情况下可以使用debounce操作符,它会过滤Observable,小于指定时间间隔的数据会被过滤掉。debounce操作符会等待一段时间,直到过了间隔时间,才会发射最后一次的数据。如果想要过滤后面的数据,发送第一次的数据,则要使用throttle_first操作符。

下面的代码可以比较好的演示这个操作符,快速按回车键发送数据,注意观察按键和数据显示之间的关系,还可以把throttle_first操作符换成debounce操作符,然后再看看输出会发生什么变化,还可以完全注释掉pipe中的操作符,再看看输出会有什么变化。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

# debounce操作符,仅在时间间隔之外的可以发射

ob = Subject()
ob.pipe(
    op.throttle_first(3)
    # op.debounce(3)
).subscribe(
    on_next=lambda i: print(i),
    on_completed=lambda: print('Completed')
)

print('press enter to print, press other key to exit')
while True:
    s = input()
    if s == '':
        ob.on_next(datetime.datetime.now().time())
    else:
        ob.on_completed()
        break

操作数据流

如果需要对一些数据进行操作,那么同样有一大堆操作符可以满足需求。当然这部分功能并不是Reactive X独有的,如果你对Java 8的流类库有所了解,会发现这两者这方面的功能几乎是完全一样的。

下面是个简单的例子,将两个数据源结合起来,然后找出来其中所有的偶数。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

# 操作数据流
some_data = rx.of(1, 2, 3, 4, 5, 6, 7, 8)
some_data2 = rx.from_iterable(range(10, 20))
some_data.pipe(
    op.merge(some_data2),
    op.filter(lambda i: i % 2 == 0),
    # op.map(lambda i: i * 2)
).subscribe(lambda i: print(i))

再或者一个利用reduce的简单例子,求1-100的整数和。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

rx.range(1, 101).pipe(
    op.reduce(lambda acc, i: acc + i, 0)
).subscribe(lambda i: print(i))
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python响应式类库RxPy简介 的相关文章

随机推荐

  • Unity3D游戏开发之设置动画(Animations)属性

    通过创建角色动画Avatar 在新的动画系统Mecanim中 Unity就设置了角色动画的骨架和蒙皮信息 从而就可以在Unity中实现角色动画了 切换到动画 Animations 选项卡 选中导入动画 Import Animation 的选
  • 人工神经网络的设计与实现(二) 感知机

    感知机 感知机 perceptron 是ANN的基本单元 至少我现在是这么觉得的 如果我学到后来发现不是 会来更正的 感知机 如下图 就是伸出几只小触手去感知这个世界 感知 感知 然后触手获取数据的加权和通过函数 f 得到的值即为该感知机的
  • 【热门框架】Mybatis-Plus标准CRUD操作

    MyBatis Plus提供了一系列标准的CRUD操作 包括insert delete update和select 下面是这些操作的指引 插入数据 1 使用实体类进行插入 User user new User user setName To
  • Idea license server地址

    以下都可以试试 http idea iteblog com key phphttp intellij mandroid cn http idea imsxm com https jetlicense nss im
  • 系统扩容心得

    author skatetime 2010 11 10 系统扩容心得 由于业务的快速发展 系统需要扩容 我们这次系统扩容动作比较小 相对不是很复杂 但过程是曲折的 结果是完美的 从开始准备到完成实施期间的每一个小细节都需要我们倍加注意 因为
  • vue-cli配置文件的查看和修改

    针对vue cli gt 3的版本 介绍两种修改方式 1 vue ui 在终端执行 vue ui 会打开页面 可以导入要管理的项目 会打开页面如下 2 在根目录下新建vue config js文件 添加要修改的配置
  • SSH整合中文

    在struts2里面配置一个常量
  • 第37章_瑞萨MCU零基础入门系列教程之DAC数模转换模块

    本教程基于韦东山百问网出的 DShanMCU RA6M5开发板 进行编写 需要的同学可以在这里获取 https item taobao com item htm id 728461040949 配套资料获取 https renesas do
  • http服务

  • ElasticSearch性能优化总结

    Elasticsearch是目前大数据领域最热门的技术栈之一 经过近8年的发展 已从0 0 X版升级至6 X版本 虽然增加了很多的特性和功能 但是在主体架构上 还是没有太多的变化 下面就把我对于ES使用实践的一些经验总结一下 供大家参考 也
  • VS2019未能返回新代码元素,可能是语法错误

    最近在写MFC的工程 在某次添加组件变量时 弹出提示框 未能返回新代码元素 可能是语法错误 检查了一遍没有语法错误 编译正常 网上所说的 将ncb文件删除就可以解决 找了半天没找到这个后缀名的文件 后来发现他们的帖子的发表时间都很老了 当初
  • 继电器驱动电路(各种单片机、CD4013触发器驱动电路图)

    继电器工作原理详解 附3种驱动电路图 继电器原理及分类 继电器知识点大全 看完一定有收获 线圈 继电器是一种电子控制器件 它具有控制系统 又称输入回路 和被控制系统 又称输出回路 通常应用于自动控制电路中 它实际上是用较小的电流去控制较大电
  • 基于ruoyi中shiro框架如何实现免密登录

    基于ruoyi中shiro框架如何实现免密登录 所做项目与第三方合作 系统间存在一些接口调用 需要做授权登录 我们的项目整体使用springboot框架结合部分ruoyi的后台管理框架 认证登陆采用了shiro框架 密码在数据库中经过盐值
  • vue-awesome-swiper 配置分页不显示

    使用 vue awesome swiper 的时候遇到一个问题 明明配置了分页 在页面上却没有展示出小圆点 数据量也是足够分页的 安装的 vue awesome swiper 是 4 1 1 版本的 这是 html 的代码片段
  • Android 蓝牙笔记-底层RFKILL驱动

    概念 RFKill 就是RF 射频 设备的开关 有类似一键关闭所有射频外设的功能 比如当我们在飞机上飞行开启飞行模式时候 所有这些RF相关的设备都需要关闭 linux的rfkill子系统提供了用于禁用系统中任何无线电发射器的通用接口 发射设
  • AD20-封装的创作及添加

    原理图你是可以画的大一点 丑一点 但是封装库一定要按一比一的比例去画 否者是会影响后期的制作 也有可能是不能做出来的 画原理图是在 这个文件中进行 Ctrl m 是测量中心距离 先选中一个中心 在选中另一个中心 gg 按两次 是设置栅格的长
  • 浅谈数组与链表的区别

    1 区别 1 数组元素地址需要连续内存空间 链表节点地址不需要连续内存空间 2 数组在最初就确定了成员数量 后期无法修改 链表的节点个数可动态增减 3 数组元素只能是同1种数据类型 链表节点可携带多种数据类型 4 数组从栈中分配空间 链表从
  • java元注解

    java元注解 本文涉及以下这些内容 如果不清楚的话 可以看一下 相信会对你有些许帮助 1 使用IntelliJ IDEA 2018查看字节码 2 使用IntelliJ IDEA 2018生成帮助文档 本文主要针对于java8 java8定
  • 第十四届蓝桥杯B组第一期模拟题

    1 十进制整数 2 在十进制中是 1 位数 在二进制中对应 10 是 2 位数 十进制整数 22 在十进制中是 2 位数 在二进制中对应 10110 是 5 位数 请问十进制整数 2022 在二进制中是几位数 include
  • Python响应式类库RxPy简介

    RxPy是非常流行的响应式框架Reactive X的Python版本 其实这些版本都是一样的 只不过是各个语言的实现不同而已 因此 如果学会了其中一种 那么使用其他的响应式版本也是轻而易举的 之前我就听说过这个框架 最近决定好好研究一下 基