无法执行 HTTP 请求:Flink 中等待来自池的连接超时

2024-01-04

我正在研究一个将一些文件上传到 s3 存储桶的应用程序稍后,它从 s3 存储桶读取文件并将其推送到我的数据库.

我在用着弗林克1.4.2 and fs.s3a API用于从 s3 存储桶读取和写入文件。

将文件上传到 s3 存储桶工作正常,没有任何问题,但是当我的应用程序的第二阶段从 s3 读取这些上传的文件开始时,我的应用程序抛出以下错误:

Caused by: java.io.InterruptedIOException: Reopen at position 0 on s3a://myfilepath/a/b/d/4: org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:125)
at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:155)
at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:281)
at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:364)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:702)
at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:490)
at org.apache.flink.api.common.io.GenericCsvInputFormat.open(GenericCsvInputFormat.java:301)
at org.apache.flink.api.java.io.CsvInputFormat.open(CsvInputFormat.java:53)
at org.apache.flink.api.java.io.PojoCsvInputFormat.open(PojoCsvInputFormat.java:160)
at org.apache.flink.api.java.io.PojoCsvInputFormat.open(PojoCsvInputFormat.java:37)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:145)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)

I was 可以通过增加最大连接数来控制此错误s3a API 的参数。

截至目前,我身边有s3 存储桶中有 1000 个文件这是由我的应用程序推拉在 s3 存储桶中并且我的最大连接数是 3000。我使用 Flink 的并行性从 s3 存储桶上传/下载这些文件。我的任务管理器计数为 14。 这是一间歇性故障,我也有这种情况的成功案例。

我的查询是,

  1. 为什么我会出现间歇性故障?如果我设置的最大连接较低,那么我的应用程序应该在每次运行时抛出此错误。
  2. 有没有什么方法可以计算我的应用程序工作所需的最佳最大连接数,而不会遇到连接池超时错误?或者这个错误是否与我不知道的其他事情有关?

谢谢 提前


根据我通过 Flink(批处理)工作流程处理来自 S3 的大量文件的经验,一些评论:

  1. 当您读取文件时,Flink 将根据文件数量和每个文件的大小计算“分割”。每个分割都是单独读取的,因此理论上最大的同时连接数不是基于文件数,而是基于文件和文件大小的组合。
  2. HTTP 客户端使用的连接池在一段时间后释放连接,因为能够重用现有连接是一个胜利(服务器/客户端握手不必发生)。因此,这给池中可用连接的数量带来了一定程度的随机性。
  3. 连接池的大小不会对内存产生太大影响,因此我通常将其设置得相当高(例如,最近的工作流程为 4096)。
  4. 使用AWS连接代码时,bump的设置为fs.s3.maxConnections,这与纯 Hadoop 配置不同。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

无法执行 HTTP 请求:Flink 中等待来自池的连接超时 的相关文章

随机推荐

  • SQL中如何删除重复记录

    如何删除sql中的重复记录 In SQL Server 2005以上 WITH q AS SELECT ROW NUMBER OVER PARTITION BY dup column ORDER BY dup column AS rn FR
  • 快速滑出菜单而不滑动导航栏(以编程方式)

    几天来我一直在尝试制作左侧滑出菜单 我无法让任何库与我的应用程序配合使用 因此我求助于 raywenderlich 的教程 http www raywenderlich com 78568 create slide out navigati
  • Hibernate映射:一列到多个表

    我有一个针对场景的 最佳实践 问题 设想 DB 中的多个实体 例如 Document BlogPost Wiki 可以由个人共享 不是为每个实体创建共享表 而是创建单个共享表 问题是 如何将共享表与不同的实体进行映射 我有三个选项 请告知哪
  • 如何从 C# 与英特尔新的 DRNG(RDRAND 指令)交互? [关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我希望从 C 程序集中使用英特尔的数字随机数生成器 Ivy Bridge 中的 RDRAND 指令 我查看了 cpp 库 但我希望有一个更
  • 在 UIImageView 中旋转图像

    是否可以仅旋转 UIImageView 中的图像 我正在寻找有关它的信息 但我只找到了如何旋转 UIImageVeiw 的信息 您可以使用以下代码旋转图像 注意 这使用了 CGImageRef 您可以通过以下方式从 UIImage 获取它
  • 获取 Spring bean 的新实例

    我有一个名为MyInterface 实现的类MyInterface 我们称之为MyImplClass 还实现了Runnable接口 以便我可以使用它来实例化线程 这是我现在的代码 for OtherClass obj someList My
  • 是否可以在宏内定义宏?

    我想使用这样的宏参数 define D cond do if cond define YYY 1 else define YYY 0 while 0 是否可以 UPD也许当源被预处理两次时 gcc E source c gcc xc 接下来
  • 是否可以在网络浏览器中模拟 Android“硬件”后退按钮?

    很高兴可以在浏览器中的 Cordova Ionic 应用程序中测试许多案例 但我还没有找到一种假装按下 Android 以前是硬件 后退按钮的方法 如果有一个带有后退按钮或组合键 例如 Alt Ctrl 是否可以用 JavaScript 触
  • 如何禁用Tensorflow的多线程?

    我正在使用不支持多线程的模拟器运行 Tensorflow 程序 我在tensorflow core common runtime local device cc第38行将intra op parallelism threads更改为1 但一
  • 为什么我没有从子进程中获取退出状态?

    我有一个正在管理的 Perl 程序 它能够分叉多个进程 最多达到指定的限制 监视它们 并在它们退出时分叉其他进程 再次达到限制 直到要运行的事物列表完成 它工作正常 除了由于某种原因它似乎没有从我的子进程中获取正确的退出状态 不起作用的代码
  • create-react-app + Nodejs (express) 服务器

    我在我的应用程序中使用 NodeJs React 我在 NodeJs 中使用express 我使用 create react app npm 创建了示例应用程序 我使用 NodeJs 从 React 应用程序调用 oauth 令牌 我在这篇
  • 自删除bash脚本

    bash 脚本如何在遇到删除自身的语句后执行 例如 当我运行 test sh 脚本时 其中包含 lt some commands gt rm test sh lt some more commands gt end 脚本执行到最后才删除自身
  • 检查 mysql_query 是否返回任何结果的最佳方法?

    我正在寻找最好的方法来检查查询中是否返回了任何结果 我感觉这部分代码我写了很多次 有时会出错 有时则不会 例如 我运行此查询来检查用户名是否存在 然后再将新用户名插入数据库 result mysql query SELECT FROM 然后
  • 对矩阵中的列重新排序

    假设我有一个n row m列矩阵A 我想重新排序每一列m根据某些特定行的排序 例如 如果我采取order A k 这给了我列中元素的数字或字母顺序k 我现在想对矩阵中的每一列进行排序A根据这些排名 使元素1 n每行都按顺序对应于元素1 n
  • Mailchimp API /列出 merge_fields 的日期类型参数格式

    我有一个date我需要在 Mailchimp API 的 v3 版本中填充该 merge field 在我找到答案之前 我在谷歌上进行了很多搜索 希望这可以节省某人的时间 预期的格式是 mm dd yyyy 看起来月份和日期的个位数就可以了
  • 仅跟踪嵌套 div 标签中悬停子元素的鼠标移动

    我有一个可以有 n 个嵌套 div 标签的环境 我必须仅在子 div 中跟踪鼠标的鼠标移动时刻 我有以下代码 结果显示在列表中 Problem 如果我附加更多子 div 鼠标移动也会跟踪所有父级 div 我想要的是 仅获取鼠标悬停区域的鼠标
  • 如何将日期构建附加到 gradle 上的 versionNameSuffix

    我正在使用 Android Studio 我需要在后面附加一个后缀versionNameSuffix在我的安卓设备上构建 gradle文件 我有三种不同的构建类型 我只需将日期时间附加到我的 测试版 版本中 我的实际文件是 defaultC
  • 为什么空格会影响 ruby​​ 函数调用?

    我收到此代码的语法错误 render json what gt created whatCreated gt thing htmlOutput gt render to string partial some partial 但使用这段代码
  • 使用 Yfinance 获取市值数据

    我试图使用 yfinance 获取股票的市值数据 这是我的代码 import yfinance as yf import numpy as np from pandas datareader import data import panda
  • 无法执行 HTTP 请求:Flink 中等待来自池的连接超时

    我正在研究一个将一些文件上传到 s3 存储桶的应用程序稍后 它从 s3 存储桶读取文件并将其推送到我的数据库 我在用着弗林克1 4 2 and fs s3a API用于从 s3 存储桶读取和写入文件 将文件上传到 s3 存储桶工作正常 没有