从Redis读取数据到Flink

2023-12-12

我一直在尝试找到一个连接器来将数据从Redis读取到Flink。 Flink 的文档包含用于写入 Redis 的连接器的描述。我需要在 Flink 作业中从 Redis 读取数据。在使用 Apache Flink 进行数据流传输,Fabian 提到可以从 Redis 读取数据。可用于此目的的连接器是什么?


我们正在生产中运行一个大致如下所示的

class RedisSource extends RichSourceFunction[SomeDataType] {

  var client: RedisClient = _

  override def open(parameters: Configuration) = {
    client = RedisClient() // init connection etc
  }

  @volatile var isRunning = true

  override def cancel(): Unit = {
    isRunning = false
    client.close()
  }

  override def run(ctx: SourceContext[SomeDataType]): Unit = while (isRunning) {
      for {
        data <- ??? // get some data from the redis client
      } yield ctx.collect(SomeDataType(data))

  }
} 

我认为这实际上取决于您需要从 Redis 获取什么。上面的代码可用于从列表/队列中获取消息,转换/推送,然后将其从队列中删除。 Redis 还支持 Pub/Sub,因此可以订阅、获取 SourceConext 并向下游推送消息。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从Redis读取数据到Flink 的相关文章

  • 2 个具有共享 Redis 依赖的 Helm Chart

    目前 我有 2 个 Helm Charts Chart A 和 Chart B Chart A 和 Chart B 对 Redis 实例具有相同的依赖关系 如Chart yaml file dependencies name redis v
  • Redis是如何实现高吞吐量和高性能的?

    我知道这是一个非常普遍的问题 但是 我想了解允许 Redis 或 MemCached Cassandra 等缓存 以惊人的性能极限工作的主要架构决策是什么 如何维持连接 连接是 TCP 还是 HTTP 我知道它完全是用C写的 内存是如何管理
  • 我可以将 flink RocksDB 状态后端与本地文件系统一起使用吗?

    我正在探索使用 FlinkrocksDb 状态后端 文档似乎暗示我可以使用常规文件系统 例如 file data flink checkpoints 但代码 javadoc 仅在此处提到 hdfs 或 s3 选项 我想知道是否可以将本地文件
  • 如何使 Redis 缓存中数据层次结构(树)的部分内容无效

    我有一些产品数据 需要在 Redis 缓存中存储多个版本 数据由 JSON 序列化对象组成 获取普通 基本 数据的过程很昂贵 将其定制为不同版本的过程也很昂贵 因此我想缓存所有版本以尽可能进行优化 数据结构看起来像这样 BaseProduc
  • 在 Redis 上为 Django 和 Express.js 应用程序共享会话存储

    我想创建一个包含一些登录用户的 Django 应用程序 另一方面 由于我想要一些实时功能 所以我想使用 Express js 应用程序 现在的问题是 我不希望身份不明的用户访问 Express js 应用程序的日期 因此 我必须在 Expr
  • Redis+Docker+Django - 错误 111 连接被拒绝

    我正在尝试使用 Redis 作为使用 Docker Compose 的 Django 项目的 Celery 代理 我无法弄清楚我到底做错了什么 但尽管控制台日志消息告诉我 Redis 正在运行并接受连接 事实上 当我这样做时 docker
  • Amazon Elasticache Redis 集群 - 无法获取端点

    我需要获取 Amazon Elasticache 中 Redis 集群的终端节点 以下代码适用于 Memcached 集群 但不适用于 Redis import com amazonaws auth AWSCredentials impor
  • 节点应用程序之间共享会话?

    我目前有两个独立的节点应用程序在两个不同的端口上运行 但共享相同的后端数据存储 我需要在两个应用程序之间共享用户会话 以便当用户通过一个应用程序登录时 他们的会话可用 并且他们似乎已登录到另一个应用程序 在本例中 它是一个面向公众的网站和一
  • 在 Spring 4 中干掉通用的 RedisTemplate

    我读到你可以拥有 Autowired从 Spring 4 开始泛型 这太棒了 我有一个摘要RedisService我想参加的课程 Autowired一个通用的 RestTemplate 如下所示 public abstract class
  • 使用环境变量在 redis.conf 中设置动态路径

    我有一个环境变量MY HOME其中有一个目录的路径 home abc 现在 我有一个redis conf文件 我需要像这样设置这个路径 redis conf pidfile MY HOME local var pids redis pid
  • Redis 中存储整数和字符串的区别

    这两个命令有什么区别吗 LPUSH myset 123 LPUSH myset 123 我想存储大约 500 万个整数 并且我想以最有效的方式做到这一点 不 没有什么区别 两者都存储为字符串 从redis io http redis io
  • 将文件传递给活动作业/后台作业

    我通过标准文件输入接收请求参数中的文件 def create file params file upload Upload create file file filename img png end 但是 对于大型上传 我想在后台作业中执行
  • Apache Flink 中的并行度

    我可以为 Flink 程序中任务的不同部分设置不同的并行度吗 例如 Flink 如何解释以下示例代码 两个自定义实践者MyPartitioner1 MyPartitioner2 将输入数据划分为两个4和2个分区 partitionedDat
  • 批量将Dictionary中的数据设置到Redis中

    我正在使用 StackExchange Redis DB 插入键值对字典Batch如下 private static StackExchange Redis IDatabase database public void SetAll
  • 在redis中存储多个嵌套对象

    我想在redis中存储多个复杂的json数据 但不知道如何 这是我的 json 结构 users user01 username ally email email protected cdn cgi l email protection u
  • redis能完全取代mysql吗?

    简单的问题 我是否可以使用 redis 而不是 mysql 来处理各种 Web 应用程序 社交网络 地理位置服务等 IT 领域没有什么是不可能的 但有些事情可能会变得极其复杂 将键值存储用于全文搜索之类的事情可能会非常痛苦 另外 据我所知
  • JedisPoolConfig 不可分配给 GenericObjectPoolConfig

    我有一个基于 Spring 的 Java Web 应用程序托管在 Heroku 上 我正在尝试使用 Redis 实现来利用 Spring 缓存抽象 当服务器启动时 我收到一条错误消息 Type redis clients jedis Jed
  • Flink 在 Kubernetes 上的部署和 Native Kubernetes 有什么不同

    黑白的主要区别是什么原生 Kubernetes https ci apache org projects flink flink docs stable ops deployment native kubernetes html and 库
  • 如何高效地将数十亿数据插入Redis?

    我有大约 20 亿个键值对 我想将它们有效地加载到 Redis 中 我目前正在使用 Python 并使用 Pipe 如redis py https redis py readthedocs io en latest redis Redis
  • Redis 客户端忽略其上设置的配置选项并尝试连接到默认 IP 127.0.01

    在AWS中 我使用ElastiCache Redis服务器并使用节点作为后端和 promise redis 包 这就是我尝试连接到我的 redis 服务器端点的方法 client redis createClient host my red

随机推荐

  • 如何让程序使用 Python GUI 中的输入框等待输入?

    这是我用来启动程序主要部分的函数的代码 但是我想要某种循环或创建十个问题的东西 但在进入下一个问题之前等待来自输入框的输入 有任何想法吗 def StartGame root Tk root title Maths Quiz Trigono
  • 使用 Xcode 4.2 ios 5.0 构建的应用程序安装在装有 ios 4.3.5 的 iPhone 上时崩溃

    我有一个应用程序将不再在 ios 4 x 上构建 上周它运行良好 现在我可以确认有 2 位使用旧版 ios 的人无法在 Test Flight 上启动该应用程序 1 人 ios 4 3 x 无法再在 Xcode 中构建应用程序 调试器输出如
  • 调试代码优先的实体框架迁移代码

    我首先在我的网站中使用实体框架代码 我只是想知道是否有任何方法可以调试迁移代码 你知道 比如设置断点之类的 我正在使用包管理器控制台来更新数据库Update Database Thanks 我知道 EF Code First Migrati
  • setScrollable 不适用于 NSTextFieldCell

    我希望我的每个表格单元格都可以滚动 因为它是可编辑的 我正在使用 self nsChildTextFieldObj NSTextFieldCell alloc init self nsChildTextFieldObj setControl
  • 选择多个选项

    为了一些看似简单的事情而把我的头撞到墙上 这里是
  • R Markdown 中的图形与 html 输出的交叉引用不起作用

    我想展示对我使用 R Markdown 创建的 HTML 文档中的图片的良好交叉引用 然而 即使我跟着关于交叉引用的书本部分 我无法获得在最终 HTML 输出中显示的引用 我在 R Studio 工作 如果有帮助的话 Rmd 文件 titl
  • 列复制并粘贴到行中

    我有一个包含以下内容的文件 sensor write reg client 0x57 0x00 sensor write reg client 0x58 0x00 sensor write reg client 0x59 0x00 sens
  • 将.c文件重命名为.cpp,导入Cython库失败

    我有一个正在运行的 Cython 程序 其中包含一些 C 库和自定义 C 代码 最近 我不得不将我的项目切换到 C 所以我将所有 C 代码重命名为 cpp Cython 编译良好并生成了 so文件 但是 当我尝试在 Python 中导入库时
  • 从现有按钮创建按钮数组(集合)

    有没有一种简单的方法可以从表单上的现有按钮创建按钮集合 在 c 中 我的表单上已经有一系列按钮 我想使用索引来访问它们 例如 myButtonArray 0 ForeColor Do something with it 这可以做到吗 编辑
  • 如何在c中获取时间戳

    我想获取我登录 c 的时间戳 我写了一个函数来获取时间戳 但是当我返回变量时我得到不同的值 My code include
  • 如何使用 Android SDK 和 AVD 管理器下载 Google API(因为它提供 SHA-1 MessageDigest 不可用)?

    我无法使用 Android SDK 和 AVD 管理器下载 Google API 每当尝试时 我都会收到以下错误 SHA 1 MessageDigest 不可用 有什么想法吗 我找到了在 Windows XP 环境中执行此操作的解决方法或正
  • Python:退出for循环?

    我对 SO 做了一些研究 并且知道有人提出了许多类似的问题 但我无法完全得到答案 不管怎样 我正在尝试构建一个库来使用 塞萨尔数字 技术 加密 字符串 这意味着我必须获取该字符串并将每个字母替换为字母表中 X 位置之外的另一个字母 我希望这
  • boost:asio::read 或 boost:asio::async_read 超时

    是的 我知道对此有一些疑问time out in boost asio 我的问题可能太简单了asio大伙在这里解决 我在用boost asio在 TCP 协议上以尽可能快的速度通过网络连续循环读取数据 跟随功能ReadData 不断接到工作
  • jsf 嵌入复合组件后如何禁用 ViewHandler 中的元素?

    如果满足某些条件 我将使用 ViewHandler 阻止任何访问页面上的所有输入元素 这对于 主 xhtml 文件中的输入元素非常有效 但复合组件中的输入元素不会被阻止 我认为这与 JSF 仅在我的 ViewHandler 完成其工作后才嵌
  • 为什么当 dir="rtl" 时表格边距会出错?

    when I change the page direction to rtl the table margins is go incorrect as in this picture but when I only remove the
  • CSS 类特异性顺序

    我定义了自己的 CSS 类 my ui table td border width 1px border 0 padding 5px 10px border style hidden border color inherit 默认 Prim
  • Meteor - 为什么我应该尽可能使用 this.userId 而不是 Meteor.userId() ?

    判断从这条评论作者 David Glasser 在 GitHub 问题中 this userId是主要 API 并且Meteor userId 是 JavaScript 新手的语法糖 他们可能还不了解成功使用它的细节 看来我们应该使用thi
  • Angular ui-router...显示默认选项卡

    我抵达bookDetails状态形成一些其他链接 这里bookDetails州的模板有不同的链接tabs 或模板 及相关控制器EditBookController有一个 json 文件 我使用它以不同的方式构建表单tabs与像这样的国家bo
  • 有没有办法在动态/扩展中执行链接空检查?

    C 有有用的空条件运算符 很好地解释了这个答案 too 我想知道当我的对象是动态 扩展对象时是否可以进行类似的检查 让我向您展示一些代码 给定这个类的层次结构 public class ClsLevel1 public ClsLevel2
  • 从Redis读取数据到Flink

    我一直在尝试找到一个连接器来将数据从Redis读取到Flink Flink 的文档包含用于写入 Redis 的连接器的描述 我需要在 Flink 作业中从 Redis 读取数据 在使用 Apache Flink 进行数据流传输 Fabian