Apache Flink AWS S3 Sink 是否需要 Hadoop 进行本地测试?

2024-03-06

我对 Apache Flink 比较陌生,我正在尝试创建一个简单的项目,将文件生成到 AWS S3 存储桶。根据文档,我似乎需要安装 Hadoop 才能执行此操作。

如何设置本地环境来测试此功能?我在本地安装了 Apache Flink 和 Hadoop。我已对 Hadoop 的 core-site.xml 配置添加了必要的更改,并将 HADOOP_CONF 路径添加到了 flink.yaml 配置中。当我尝试通过 Flink UI 在本地提交作业时,我总是收到错误

2016-12-29 16:03:49,861 INFO  org.apache.flink.util.NetUtils                                - Unable to allocate on port 6123, due to error: Address already in use
2016-12-29 16:03:49,862 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to run JobManager.
java.lang.RuntimeException: Unable to do further retries starting the actor system
    at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2203)
    at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2143)
    at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:2040)
    at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)

我假设我在环境设置方面遗漏了一些东西。可以在本地执行此操作吗?任何帮助,将不胜感激。


虽然您需要 Hadoop 库,但您不必安装 Hadoop 即可在本地运行并写入 S3。我只是碰巧尝试编写基于 Avro 模式的 Parquet 输出并生成 SpecificRecord 到 S3。我正在通过 SBT 和 Intellij Idea 在本地运行以下代码的版本。所需零件:

1) 使用以下文件指定所需的 Hadoop 属性(注意:不建议定义 AWS 访问密钥/秘密密钥。最好在具有适当 IAM 角色以读取/写入 S3 存储桶的 EC2 实例上运行。但需要本地进行测试)

<configuration>
    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <!-- Comma separated list of local directories used to buffer
         large results prior to transmitting them to S3. -->
    <property>
        <name>fs.s3a.buffer.dir</name>
        <value>/tmp</value>
    </property>

    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
    <property>
        <name>fs.s3a.access.key</name>
        <value>YOUR_ACCESS_KEY</value>
    </property>

    <!-- set your AWS access key -->
    <property>
        <name>fs.s3a.secret.key</name>
        <value>YOUR_SECRET_KEY</value>
    </property>
</configuration>

2)进口: 导入 com.uebercomputing.eventrecord.EventOnlyRecord

import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
import org.apache.flink.api.scala.{ExecutionEnvironment, _}

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job

import org.apache.parquet.avro.AvroParquetOutputFormat

3)Flink代码使用具有上述配置的HadoopOutputFormat:

    val events: DataSet[(Void, EventOnlyRecord)] = ...

    val hadoopConfig = getHadoopConfiguration(hadoopConfigFile)

    val outputFormat = new AvroParquetOutputFormat[EventOnlyRecord]
    val outputJob = Job.getInstance

    //Note: AvroParquetOutputFormat extends FileOutputFormat[Void,T]
    //so key is Void, value of type T - EventOnlyRecord in this case
    val hadoopOutputFormat = new HadoopOutputFormat[Void, EventOnlyRecord](
      outputFormat,
      outputJob
    )

    val outputConfig = outputJob.getConfiguration
    outputConfig.addResource(hadoopConfig)
    val outputPath = new Path("s3://<bucket>/<dir-prefix>")
    FileOutputFormat.setOutputPath(outputJob, outputPath)
    AvroParquetOutputFormat.setSchema(outputJob, EventOnlyRecord.getClassSchema)

    events.output(hadoopOutputFormat)

    env.execute

    ...

    def getHadoopConfiguration(hadoodConfigPath: String): HadoopConfiguration = {
      val hadoopConfig = new HadoopConfiguration()
      hadoopConfig.addResource(new Path(hadoodConfigPath))
      hadoopConfig
    }

4)构建依赖项和使用的版本:

    val awsSdkVersion = "1.7.4"
    val hadoopVersion = "2.7.3"
    val flinkVersion = "1.1.4"

    val flinkDependencies = Seq(
      ("org.apache.flink" %% "flink-scala" % flinkVersion),
      ("org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion)
    )

    val providedFlinkDependencies = flinkDependencies.map(_ % "provided")

    val serializationDependencies = Seq(
      ("org.apache.avro" % "avro" % "1.7.7"),
      ("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"),
      ("org.apache.parquet" % "parquet-avro" % "1.8.1")
    )

    val s3Dependencies = Seq(
      ("com.amazonaws" % "aws-java-sdk" % awsSdkVersion),
      ("org.apache.hadoop" % "hadoop-aws" % hadoopVersion)
    )

编辑使用 writeAsText 到 S3:

1) 创建一个 Hadoop 配置目录(将其引用为 hadoop-conf-dir),其中包含文件 core-site.xml。

例如:

mkdir /home/<user>/hadoop-config
cd /home/<user>/hadoop-config
vi core-site.xml

#content of core-site.xml 
<configuration>
    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <!-- Comma separated list of local directories used to buffer
         large results prior to transmitting them to S3. -->
    <property>
        <name>fs.s3a.buffer.dir</name>
        <value>/tmp</value>
    </property>

    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
    <property>
        <name>fs.s3a.access.key</name>
        <value>YOUR_ACCESS_KEY</value>
    </property>

    <!-- set your AWS access key -->
    <property>
        <name>fs.s3a.secret.key</name>
        <value>YOUR_SECRET_KEY</value>
    </property>
</configuration>

2) 创建一个目录(将其引用为 flink-conf-dir),其中包含文件 flink-conf.yaml。

例如:

mkdir /home/<user>/flink-config
cd /home/<user>/flink-config
vi flink-conf.yaml

//content of flink-conf.yaml - continuing earlier example
fs.hdfs.hadoopconf: /home/<user>/hadoop-config

3) 编辑用于运行 S3 Flink 作业的 IntelliJ Run 配置 - 运行 - 编辑配置 - 并添加以下环境变量:

FLINK_CONF_DIR and set it to your flink-conf-dir

Continuing the example above:
FLINK_CONF_DIR=/home/<user>/flink-config

4) 使用该环境变量集运行代码:

events.writeAsText("s3://<bucket>/<prefix-dir>")

env.execute
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Flink AWS S3 Sink 是否需要 Hadoop 进行本地测试? 的相关文章

  • CORS。预签名 URL。 S3

    我已经生成了一个预签名的 S3 POST URL 使用返回参数 然后将其传递到我的代码中 但我不断收到此错误Response to preflight request doesn t pass access control check No
  • Apache Flink 中的并行度

    我可以为 Flink 程序中任务的不同部分设置不同的并行度吗 例如 Flink 如何解释以下示例代码 两个自定义实践者MyPartitioner1 MyPartitioner2 将输入数据划分为两个4和2个分区 partitionedDat
  • Django - 获取 PIL 图像保存方法以与 Amazon s3boto 存储一起使用

    为了在上传时调整图像大小 使用 PIL 我重写了文章模型的保存方法 如下所示 def save self super Article self save if self image size 160 160 image Image open
  • 使用 Java API 在 Hadoop 中移动文件?

    我想使用 Java API 在 HDFS 中移动文件 我想不出办法做到这一点 FileSystem 类似乎只想允许在本地文件系统之间移动 但我想将它们保留在 HDFS 中并将它们移动到那里 我错过了一些基本的东西吗 我能想到的唯一方法是从输
  • S3A:失败,而 S3:在 Spark EMR 中工作

    我将 EMR 5 5 0 与 Spark 结合使用 如果我使用一个简单的文件写入 s3s3 网址写得很好 但如果我使用s3a 地址 它失败了Service Amazon S3 Status Code 403 Error Code Acces
  • 如何在不安装 AWS SDK 的情况下通过 Powershell 从 S3 下载文件?

    我想使用 Windows Powershell 从我的 AWS S3 存储桶下载文件 我无法安装任何 AWS 软件 需要创建一个 API 才能访问 AWS S3 中的文件 我使用Postman测试该文件是否可访问并且成功 鉴于这一成功 我尝
  • Apache Spark 从 S3 读取异常:内容长度分隔消息正文过早结束(预期:2,250,236;收到:16,360)

    我想从 S3 资源创建 Apache Spark DataFrame 我在 AWS 和 IBM S3 Clout 对象存储上尝试过 都失败了 org apache spark util TaskCompletionListenerExcep
  • IOPS(在 Amazon EBS 中)在实践中意味着什么?

    我有一些应用程序所需的图像 图像很多 50 000 但整体大小很小 40 Mb 最初 我以为我会简单地使用 S3 但上传速度非常慢 作为临时解决方案 我想附加一个包含图像的 EBS 这样就可以了 然而 在阅读了一些有关 EBS 通用 gp2
  • S3 REST API 和 POST 方法

    我在用着AWS S3 REST API http docs aws amazon com AmazonS3 latest API APIRest html 在解决了一些令人烦恼的签名问题后 它似乎可以工作 但是 当我使用正确的 REST 动
  • 源访问身份 (OAI) 与 CloudFront 签名 URL 之间的关系

    因此 我一直在遵循有关 CloudFront 和 S3 的指南 但我觉得我仍然缺少原始访问身份 OAI 和 CloudFront 签名 URL 之间关系的核心信息 我想要的是 一个私有 CDN 用于托管音频片段 几秒长 和低分辨率图像 我只
  • 在 Apache Spark 上下文中,内存数据存储意味着什么?

    我读到 Apache Spark 将数据存储在内存中 然而 Apache Spark 旨在分析大量数据 又称大数据分析 在这种情况下 内存数据存储的真正含义是什么 它可以存储的数据是否受到可用 RAM 的限制 它的数据存储与使用HDFS的A
  • sqoop 通过 oozie 导出失败

    我正在尝试将数据导出到mysql from hdfs通过sqoop 我可以通过 shell 运行 sqoop 并且它工作正常 但是当我通过调用oozie 它出现以下错误并失败 我还包括了罐子 没有描述性日志 sqoop脚本 export c
  • 如何用snappy解压hadoop的reduce输出文件尾?

    我们的 hadoop 集群使用 snappy 作为默认编解码器 Hadoop作业减少输出文件名就像part r 00000 snappy JSnappy 无法解压缩文件 bcz JSnappy 需要以 SNZ 开头的文件 归约输出文件以某种
  • 以编程方式读取 Hadoop Mapreduce 程序的输出

    这可能是一个基本问题 但我在谷歌上找不到答案 我有一个映射缩减作业 它在其输出目录中创建多个输出文件 我的 Java 应用程序在远程 hadoop 集群上执行此作业 作业完成后 需要使用以下命令以编程方式读取输出org apache had
  • 伪模式下没有名称节点错误

    我是hadoop新手 正处于学习阶段 根据 Hadoop Definitve 指南 我已将 hadoop 设置为伪分布式模式 一切正常 昨天我什至能够执行第三章中的所有示例 今天 当我重新启动我的unix并尝试运行start dfs sh然
  • 将文件上传到S3的模拟测试用例

    我们如何模拟文件上传到 S3 我尝试过这样的事情 file mock mock MagicMock spec File name FileMock mock patch storages backends s3boto S3BotoStor
  • Hadoop 超立方体

    嘿 我正在启动一个基于 hadoop 的超立方体 具有灵活的维度数 有人知道这方面现有的方法吗 我刚刚发现PigOLAP草图 http wiki apache org pig PigOLAPSketch 但没有代码可以使用它 另一种方法是Z
  • 同一区域内但属于不同账户的AWS数据传输费用如何?

    如果 S3 gt EC2 或 EC2 gt EC2 位于同一 AWS 区域 则数据传输费用似乎是免费的 S3价格注意事项 您需要为进出 Amazon S3 的所有带宽付费 except对于以下情况 当Amazon Elastic Compu
  • Hadoop 作业:任务在 601 秒内无法报告状态

    在伪节点上运行 hadoop 作业时 任务失败并被杀死 错误 任务尝试 在 601 秒内无法报告状态 但同一个程序正在通过 Eclipse 运行 本地作业 任务 大约有 25K 个关键字 输出将是所有可能的组合 一次两个 即大约 25K 2
  • 如何使用 C# / .Net 将文件列表从 AWS S3 下载到我的设备?

    我希望下载存储在 S3 中的多个图像 但目前如果我只能下载一个就足够了 我有对象路径的信息 当我运行以下代码时 出现此错误 遇到错误 消息 读取对象时 访问被拒绝 我首先做一个亚马逊S3客户端基于我的密钥和访问配置的对象连接到服务器 然后创

随机推荐