我正在研究一个将一些文件上传到 s3 存储桶的应用程序稍后,它从 s3 存储桶读取文件并将其推送到我的数据库.
我在用着弗林克1.4.2 and fs.s3a API用于从 s3 存储桶读取和写入文件。
将文件上传到 s3 存储桶工作正常,没有任何问题,但是当我的应用程序的第二阶段从 s3 读取这些上传的文件开始时,我的应用程序抛出以下错误:
Caused by: java.io.InterruptedIOException: Reopen at position 0 on s3a://myfilepath/a/b/d/4: org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:125)
at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:155)
at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:281)
at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:364)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:702)
at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:490)
at org.apache.flink.api.common.io.GenericCsvInputFormat.open(GenericCsvInputFormat.java:301)
at org.apache.flink.api.java.io.CsvInputFormat.open(CsvInputFormat.java:53)
at org.apache.flink.api.java.io.PojoCsvInputFormat.open(PojoCsvInputFormat.java:160)
at org.apache.flink.api.java.io.PojoCsvInputFormat.open(PojoCsvInputFormat.java:37)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:145)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
I was 可以通过增加最大连接数来控制此错误s3a API 的参数。
截至目前,我身边有s3 存储桶中有 1000 个文件这是由我的应用程序推拉在 s3 存储桶中并且我的最大连接数是 3000。我使用 Flink 的并行性从 s3 存储桶上传/下载这些文件。我的任务管理器计数为 14。
这是一间歇性故障,我也有这种情况的成功案例。
我的查询是,
- 为什么我会出现间歇性故障?如果我设置的最大连接较低,那么我的应用程序应该在每次运行时抛出此错误。
- 有没有什么方法可以计算我的应用程序工作所需的最佳最大连接数,而不会遇到连接池超时错误?或者这个错误是否与我不知道的其他事情有关?
谢谢
提前
根据我通过 Flink(批处理)工作流程处理来自 S3 的大量文件的经验,一些评论:
- 当您读取文件时,Flink 将根据文件数量和每个文件的大小计算“分割”。每个分割都是单独读取的,因此理论上最大的同时连接数不是基于文件数,而是基于文件和文件大小的组合。
- HTTP 客户端使用的连接池在一段时间后释放连接,因为能够重用现有连接是一个胜利(服务器/客户端握手不必发生)。因此,这给池中可用连接的数量带来了一定程度的随机性。
- 连接池的大小不会对内存产生太大影响,因此我通常将其设置得相当高(例如,最近的工作流程为 4096)。
- 使用AWS连接代码时,bump的设置为
fs.s3.maxConnections
,这与纯 Hadoop 配置不同。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)