我一直在尝试找到一个连接器来将数据从Redis读取到Flink。 Flink 的文档包含用于写入 Redis 的连接器的描述。我需要在 Flink 作业中从 Redis 读取数据。在使用 Apache Flink 进行数据流传输,Fabian 提到可以从 Redis 读取数据。可用于此目的的连接器是什么?
我们正在生产中运行一个大致如下所示的
class RedisSource extends RichSourceFunction[SomeDataType] {
var client: RedisClient = _
override def open(parameters: Configuration) = {
client = RedisClient() // init connection etc
}
@volatile var isRunning = true
override def cancel(): Unit = {
isRunning = false
client.close()
}
override def run(ctx: SourceContext[SomeDataType]): Unit = while (isRunning) {
for {
data <- ??? // get some data from the redis client
} yield ctx.collect(SomeDataType(data))
}
}
我认为这实际上取决于您需要从 Redis 获取什么。上面的代码可用于从列表/队列中获取消息,转换/推送,然后将其从队列中删除。
Redis 还支持 Pub/Sub,因此可以订阅、获取 SourceConext 并向下游推送消息。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)