是否可以将 Riak CS 与 Apache Flink 一起使用?

2024-01-03

我要配置filesystem状态后端和zookeeper恢复模式:

state.backend: filesystem
state.backend.fs.checkpointdir: ???

recovery.mode: zookeeper
recovery.zookeeper.storageDir: ???

正如你所看到的,我应该指定checkpointdir and storageDir参数,但我没有 Apache Flink 支持的任何文件系统(例如 HDFS 或 Amazon S3)。但我已经安装了Riak CS集群(好像是这样兼容S3 http://docs.basho.com/riakcs/latest/).

那么,我可以将 Riak CS 与 Apache Flink 一起使用吗?如果可能:如何配置 Apache Flink 与 Riak CS 配合使用?


答:如何加入 Apache Flink 和 Riak CS?

Riak CS 具有 S3(版本 2)兼容接口。因此,可以使用 Hadoop 的 S3 文件系统适配器与 Riak CS 配合使用。

我不知道为什么,但 Apache Flink 在 fat jar 中只有部分 Hadoop 文件系统适配器(lib/flink-dist_2.11-1.0.1.jar)即它有 FTP 文件系统(org.apache.hadoop.fs.ftp.FTPFileSystem)但没有 S3 文件系统(即org.apache.hadoop.fs.s3a.S3AFileSystem)。所以,你有两种方法来解决这个问题:

  • 使用 Hadoop 安装中的这些适配器。我没有尝试这个,但似乎你应该只配置 HADOOP_CLASSPATH 或 HADOOP_HOME evn 变量。
  • Monky 修补 Apache Flink 并下载所需的 JAR<flink home>/lib目录

因此,我选择第二种方式,因为不想在我的环境中配置 Hadoop。您可以从 Hadoop dist 或互联网复制 JAR:

curl http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.2/hadoop-aws-2.7.2.jar -o /flink/lib/hadoop-aws-2.7.2.jar
curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -o /flink/lib/aws-java-sdk-1.7.4.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.2.5/httpcore-4.2.5.jar -o /flink/lib/httpcore-4.2.5.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar -o /flink/lib/httpclient-4.2.5.jar

正如你所看到的,我使用的是旧版本,因为这个版本在 Hadoop 2.7.2 中使用,并且我使用与这个版本的 Hadoop 兼容的 Flink。

仅供参考:如果您在自己的流程中使用这些 JAR 的最新版本,此类 hack 可能会导致问题。为了避免与不同版本相关的问题,您可以在使用流构建 fat jar 时重新定位包,使用类似的东西(我正在使用 Gradle):

// Relocate org.apache.http packages because Apache Flink include old version of this library (we place them for using S3 compatible FS)
shadowJar {
    dependencies {
        include(dependency('.*:.*:.*'))
    }

    relocate 'org.apache.http', 'relocated.org.apache.http'
    relocate 'org.apache.commons', 'relocated.org.apache.commons'
}

然后你应该指定路径core-site.xml in flink-conf.yaml因为 Hadoop 兼容文件系统使用此配置来加载设置:

...
fs.hdfs.hadoopconf: /flink/conf
...

正如你所看到的,我只是把它放在<fink home>/conf目录。它有以下设置:

<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
    <property>
        <name>fs.s3a.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> // because S3A better then other: https://wiki.apache.org/hadoop/AmazonS3
    </property>
    <property>
        <name>fs.s3a.endpoint</name>
        <value>my-riak-cs.stage.local</value>  // this is my Riak CS host
    </property>
    <property>
        <name>fs.s3a.connection.ssl.enabled</name> // my Riak CS in staging doesn't support SSL
        <value>false</value>
    </property>
    <property>
        <name>fs.s3a.access.key</name>
        <value>????</value> // this is my access key for Riak CS
    </property>
    <property>
        <name>fs.s3a.secret.key</name>
        <value>????</value> // this is my secret key for Riak CS
    </property>
</configuration>

然后你应该配置 Riak CS 存储桶flink-conf.yaml作为推荐人here https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/jobmanager_high_availability.html#standalone-cluster-high-availability:

...
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints
...
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery
...

并在 Riak CS 中创建存储桶。我在用s3cmd(安装在brew在我的 OS X 开发环境中):

s3cmd mb s3://example-staging-flink

仅供参考:使用前s3cmd你应该配置它使用s3cmd --configure然后修复一些设置~/.s3cmd file:

signature_v2 = True // because Riak CS using S3 V2 interface
use_https = False // if your don't use SSL
access_key = ???
secret_key = ???
host_base = my-riak-cs.stage.local // your Riak CS host
host_bucket = %(bucket).my-riak-cs.stage.local // format of bucket used by Riak CS

因此,这就是您在 Riak CS 中为独立 HA Apache Flink 集群的保存/恢复状态应该配置的所有内容。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

是否可以将 Riak CS 与 Apache Flink 一起使用? 的相关文章

  • 如何覆盖 Apache Flink 中的配置值?

    我正在尝试将 Apache Flink 的指标收集到 Prometheus 中 Flink 文档说我需要将以下行添加到我的 flink conf yaml 中 metrics reporter promgateway class org a
  • Apache Flink 与 Elasticsearch 集成

    我正在尝试将 Flink 与 Elasticsearch 2 1 1 集成 我正在使用 Maven 依赖项
  • Flink 处理事件太慢

    我使用 Kinesis 数据流作为源 使用 elasticsearch 作为接收器 在 AWS Kinesis Data 分析应用程序中运行 Flink 作业 事件示例 area sessions userId 4450 date 2021
  • logback 在 Flink 中不起作用

    我有一个单节点 Flink 实例 它在 lib 文件夹中具有 logback 所需的 jar logback classic jar logback core jar log4j over slf4j jar 我已从 lib 文件夹中删除了
  • Flink 中复杂拓扑(多输入)的集成测试

    我需要为 flink 流拓扑编写单元测试 这基本上是一个CoFlatMapFunction 并且它有 2 个输入 我尝试从这个页面中获得一些灵感 https ci apache org projects flink flink docs s
  • flink集群启动错误[ERROR]无法正确获取JVM参数

    bin start cluster sh Starting cluster INFO 1 instance s of standalonesession are already running on centos1 Starting sta
  • Flink 中的水印和触发器有什么区别?

    我读到 排序运算符必须缓冲它接收到的所有元素 然后 当它接收到水印时 它可以对时间戳低于水印的所有元素进行排序 并按排序顺序发出它们 这是正确 因为水印表明不能有更多元素到达并与已排序元素混合 https cwiki apache org
  • Python + Beam + Flink

    我一直在尝试让 Apache Beam 可移植性框架与 Python 和 Apache Flink 一起使用 但我似乎找不到一套完整的指令来让环境正常工作 是否有任何参考资料包含使简单的 python 管道正常工作的先决条件和步骤的完整列表
  • Apache Flink AWS S3 Sink 是否需要 Hadoop 进行本地测试?

    我对 Apache Flink 比较陌生 我正在尝试创建一个简单的项目 将文件生成到 AWS S3 存储桶 根据文档 我似乎需要安装 Hadoop 才能执行此操作 如何设置本地环境来测试此功能 我在本地安装了 Apache Flink 和
  • 在 Flink 中,我可以在同一个槽中拥有一个算子的多个子任务吗?

    探索Apache Flink几天了 对Task Slot的概念有些疑惑 虽然有人问了几个问题 但有一点我不明白 我正在使用一个玩具应用程序进行测试 运行本地集群 我已禁用运算符链接 我从文档中知道插槽允许内存隔离而不是 CPU 隔离 阅读文
  • Apache Flink - 作业内部无法识别自定义 java 选项

    我已将以下行添加到 flink conf yaml 中 env java opts Ddy props path PATH TO PROPS FILE 启动 jobmanager jobmanager sh start cluster 时
  • 基于流的应用程序中的受控/手动错误/恢复处理

    我正在开发一个基于的应用程序Apache Flink 它利用Apache Kafka用于输入和输出 该应用程序可能会被移植到Apache Spark 所以我也将其添加为标签 问题仍然相同 我要求通过 kafka 接收的所有传入消息必须按顺序
  • 尝试升级到 flink 1.3.1 时出现异常

    我尝试将集群中的 flink 版本升级到 1 3 1 以及 1 3 2 但我的任务管理器中出现以下异常 2018 02 28 12 57 27 120 ERROR org apache flink streaming runtime tas
  • 创建具有通用返回类型的 FlinkSQL UDF

    我想定义函数MAX BY接受类型值T和类型的订购参数Number并根据排序从窗口返回最大元素 类型为T 我试过了 public class MaxBy
  • Apache Flink - “keyBy”中的异常处理

    由于代码错误或缺乏验证 进入 Flink 作业的数据可能会触发异常 我的目标是提供一致的异常处理方式 我们的团队可以在 Flink 作业中使用这种方式 而不会导致生产中出现任何停机 重启策略似乎不适用于此处 因为 简单的重启无法解决问题 我
  • Flink 窗口:聚合并输出到接收器

    我们有一个数据流 其中每个元素都是这种类型 id String type Type amount Integer 我们想要聚合这个流并输出总和amount每周一次 目前的解决方案 Flink 管道示例如下所示 stream keyBy ty
  • Flink - 无法从检查点恢复

    我使用一个作业管理器和两个任务管理器在 kubernetes 上运行集群 我通过在作业运行时杀死一个任务管理器 Pod 来测试检查点机制 我在作业管理器和重新启动的任务管理器上遇到以下异常 工作经理例外 java lang Exceptio
  • 2022年Flink可以支持什么Java版本?

    假设我开始一个新的 Flink Java 项目 如果我寻找 稳定的 Flink Java 生产体验 我应该使用哪个版本 官方docs https nightlies apache org flink flink docs master do
  • 我想使用 Flink 的 Streaming File Sink 写入 ORC 文件,但它无法正确写入文件

    我正在从 Kafka 读取数据并尝试将其以 ORC 格式写入 HDFS 文件系统 我使用了他们官方网站上的以下链接参考 但我可以看到Flink为所有数据写入完全相同的内容并生成这么多文件并且所有文件都可以103KB https ci apa
  • 使用 scala 在 Flink 中进行实时流预测

    弗林克版本 1 2 0斯卡拉版本 2 11 8 我想使用 DataStream 来使用 scala 中的 flink 模型进行预测 我在使用 scala 的 flink 中有一个 DataStream String 其中包含来自 kafka

随机推荐

  • 梅文。 “无家可归”的罐子该怎么办?

    我有一些 proprietary jar 需要包含在我的项目中 但我不想将其安装到本地存储库 我最初所做的是将 jar 放入我的项目的版本控制中lib 文件夹 然后将 Maven 依赖项指定为
  • HTML5 画布圆形文本

    如何使用画布创建圆形文本 圆形文本 字母现在应该正确定向 CanvasRenderingContext2D prototype fillTextCircle function text x y radius startRotation va
  • 使用正则表达式进行 Github 搜索

    有没有办法使用正则表达式在 github 存储库中搜索代码 目前 我克隆了存储库并进行搜索 但我想输入类似的内容 s foo gi 并查找代码中所有出现 foo 的地方 foo create foo extend fooBar barFoo
  • 从 SurfaceView 获取图像到 ImageView?

    我在从用作相机预览的 SurfaceView 获取图像 可绘制对象或位图时遇到了一些麻烦 final CameraSurfaceView cameraSurfaceView new CameraSurfaceView this Linear
  • 使用边框创建三角形

    我最近需要创建对话气泡 为了在对话气泡的末端创建小三角形尖端 我使用了CSS技术 http jsfiddle net 66jAA 5 其中元素被赋予0 width and 0 height并给定边界 使某些边框透明会产生对角线 这非常有效
  • 如何在 React 的子功能组件中触发一个动作?

    对于基本的表单 输入布局 很明显应该使用回调来处理从子组件到父组件的状态更改 由子组件发起 但是父组件如何要求子组件重新评估其状态并将其传达回父组件 这里的最终目标只是在提交表单按钮时触发子输入的验证 给定的 ts 代码如下所示 const
  • Go 声明中的“_,”(下划线逗号)是什么?

    我似乎无法理解这种变量声明 prs m example 究竟是什么 他们为什么声明这样的变量而不是 prs m example 我发现它是举例 地图 https gobyexample com maps 它避免了必须为返回值声明所有变量 它
  • 解释一下C++代码

    我可以获得有关以下代码解释的帮助吗 include
  • “复制本地”对于项目引用是否具有传递性?

    沃特 拟议的骗局 因为这里的问题表明了相反的情况链接问题 https stackoverflow com questions 12386523 visual studio not copying content files from ind
  • guice:命令行运行时注入/绑定

    我有以下问题 Inject MyClass Service service this service service public void doSomething service invokeSelf 我有一个模块 bind servic
  • 如何在没有 TCP/IP 堆栈的情况下用 Java 发送以太网帧

    我的 Java 应用程序应该控制直接连接到我的计算机 Ubuntu 和 Windows 网络接口的外部设备 EtherCAT 总线技术 没有连接其他网络设备 通信是在标准 IEEE 802 3 以太网帧上完成的 无需 IP 堆栈 发送数据示
  • 如何在 TensorFlow 中将张量转换为 ndarray?

    我的目标是将张量转换为 ndarray 而不需要 run 或 eval 我想执行与示例相同的操作 A tf constant 5 B tf constant A 1 0 0 但是 ndarray 可以位于 tf constant 内部 但张
  • 如何使用NuGetpackages.config文件?

    I see a 包配置解决方案中我的每个项目的文件 它包含有关各种程序集信息的信息 我希望 NuGet 能够自动扫描这些 packages config 并根据需要进行下载 但事实并非如此 我需要手动安装所有软件包吗 如果右键单击相关项目
  • python pip install 在 Windows 上不起作用

    我在 Windows 上安装了 python 2 7 10 我尝试使用以下命令在命令行上安装 Django C users user myproject gt python pip install django 这会显示以下错误 pytho
  • 更改 UITextView 中一个链接的属性

    我有一个UITextView具有多个 URL 我通过设置激活dataDetectorTypes财产给UIDataDetectorTypeLink 然后我使用linkTextAttributes属性来设置链接的颜色 现在 当用户点击其中一个链
  • 此编码器要求从 initWithCoder 返回替换的对象:

    我的应用程序在 iOS 11 2 上运行良好 但在 iOS 11 3 中会崩溃 我有例外 由于未捕获的异常 NSGenericException 而终止应用程序 原因 此编码器要求从 initWithCoder 返回替换的对象 我有一个带有
  • 通过 golang 中的多个 HTTP 处理程序包含上下文对象

    我刚刚读过这篇博文 http blog golang org error handling and go TOC 3 关于创建函数类型并实现 ServeHTTP 该函数上的方法能够处理错误 例如 type appError struct E
  • 在 SQL 中实现不相交集逼近(并集查找)

    使用 SQL 实现近似不相交集的最佳方法是什么 Details 我有一个边表 存储为两列表 vertex a vertex b 我需要一个不同集合的表 存储为 vertex set id 每个顶点一行 用不相交的 set id 标记每个顶点
  • 应用于行的几何平均值

    我有这个数据框作为例子 Col1 Col2 Col3 Col4 1 2 3 2 2 我想添加名为 Gmean 的第四列 用于计算每行前 3 列的几何平均值 怎样才能完成呢 Thanks 一种方法是Scipy s geometric mean
  • 是否可以将 Riak CS 与 Apache Flink 一起使用?

    我要配置filesystem状态后端和zookeeper恢复模式 state backend filesystem state backend fs checkpointdir recovery mode zookeeper recover