Cloud Dataflow - Dataflow 如何实现并行性?

2023-12-03

我的问题是,在幕后,对于逐元素 Beam DoFn (ParDo),云数据流如何并行工作负载?例如,在我的 ParDO 中,我向外部服务器发送一个针对一个元素的 http 请求。我使用了 30 个工人,每个工人有 4vCPU。

  1. 这是否意味着每个工作线程最多有 4 个线程?
  2. 这是否意味着每个工作人员只需要 4 个 http 连接,或者如果我保持它们处于活动状态即可建立 4 个 http 连接以获得最佳性能?
  3. 除了使用更多核心或更多工作线程之外,如何调整并行级别?
  4. 使用我当前的设置(30*4vCPU 工作线程),我可以在 http 服务器上建立大约 120 个 http 连接。但服务器和工作人员的资源使用率都非常低。基本上我想让他们通过每秒发送更多请求来更加努力地工作。我应该怎么办...

代码片段来说明我的工作:

public class NewCallServerDoFn extends DoFn<PreparedRequest,KV<PreparedRequest,String>> {


private static final Logger Logger = LoggerFactory.getLogger(ProcessReponseDoFn.class);

private static PoolingHttpClientConnectionManager _ConnManager = null;
private static CloseableHttpClient _HttpClient = null;
private static HttpRequestRetryHandler _RetryHandler = null;
private static  String[] _MapServers = MapServerBatchBeamApplication.CONFIG.getString("mapserver.client.config.server_host").split(",");

@Setup
public void setupHttpClient(){

    Logger.info("Setting up HttpClient");

   //Question: the value of maxConnection below is actually 10, but with 30 worker machines, I can only see 115 TCP connections established on the server side. So this setting doesn't really take effect as I expected.....

    int maxConnection = MapServerBatchBeamApplication.CONFIG.getInt("mapserver.client.config.max_connection");
    int timeout = MapServerBatchBeamApplication.CONFIG.getInt("mapserver.client.config.timeout");

    _ConnManager = new PoolingHttpClientConnectionManager();

    for (String mapServer : _MapServers) {
        HttpHost serverHost = new HttpHost(mapServer,80);
        _ConnManager.setMaxPerRoute(new HttpRoute(serverHost),maxConnection);
    }

    // config timeout
    RequestConfig requestConfig = RequestConfig.custom()
            .setConnectTimeout(timeout)
            .setConnectionRequestTimeout(timeout)
            .setSocketTimeout(timeout).build();

    // config retry
    _RetryHandler = new HttpRequestRetryHandler() {

        public boolean retryRequest(
                IOException exception,
                int executionCount,
                HttpContext context) {

            Logger.info(exception.toString());
            Logger.info("try request: " + executionCount);

            if (executionCount >= 5) {
                // Do not retry if over max retry count
                return false;
            }
            if (exception instanceof InterruptedIOException) {
                // Timeout
                return false;
            }
            if (exception instanceof UnknownHostException) {
                // Unknown host
                return false;
            }
            if (exception instanceof ConnectTimeoutException) {
                // Connection refused
                return false;
            }
            if (exception instanceof SSLException) {
                // SSL handshake exception
                return false;
            }
            return true;
        }

    };

    _HttpClient = HttpClients.custom()
                            .setConnectionManager(_ConnManager)
                            .setDefaultRequestConfig(requestConfig)
                            .setRetryHandler(_RetryHandler)
                            .build();

    Logger.info("Setting up HttpClient is done.");

}

@Teardown
public void tearDown(){
    Logger.info("Tearing down HttpClient and Connection Manager.");
    try {
        _HttpClient.close();
        _ConnManager.close();
    }catch (Exception e){
        Logger.warn(e.toString());
    }
    Logger.info("HttpClient and Connection Manager have been teared down.");
}




@ProcessElement
public void processElement(ProcessContext c) {

    PreparedRequest request = c.element();

    if(request == null)
        return;

    String response="{\"my_error\":\"failed to get response from map server with retries\"}";


    String chosenServer = _MapServers[request.getHardwareId() % _MapServers.length];

    String parameter;
    try {
        parameter = URLEncoder.encode(request.getRequest(),"UTF-8");
    } catch (UnsupportedEncodingException e) {
        Logger.error(e.toString());

        return;
    }

    StringBuilder sb = new StringBuilder().append(MapServerBatchBeamApplication.CONFIG.getString("mapserver.client.config.api_path"))
            .append("?coordinates=")
            .append(parameter);

    HttpGet getRequest = new HttpGet(sb.toString());
    HttpHost host = new HttpHost(chosenServer,80,"http");
    CloseableHttpResponse httpRes;

    try {
        httpRes = _HttpClient.execute(host,getRequest);
        HttpEntity entity = httpRes.getEntity();
        if(entity != null){
            try
            {
                response = EntityUtils.toString(entity);
            }finally{
                EntityUtils.consume(entity);
                httpRes.close();
            }
        }
    }catch(Exception e){
        Logger.warn("failed by get response from map server with retries for " + request.getRequest());
    }

    c.output(KV.of(request, response));

}
}

  1. 是的,基于此answer.
  2. 不,您可以建立更多联系。根据我的answer,您可以使用异步http客户端来获得更多并发请求。正如这个答案也描述的那样,您需要收集这些异步调用的结果,并在任何@ProcessElement or @FinishBundle.
  3. See 2.
  4. 由于您的资源使用率较低,这表明工作线程大部分时间都在等待响应。我认为通过上述方法,您可以更好地利用您的资源,并且可以用更少的员工实现相同的性能。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Cloud Dataflow - Dataflow 如何实现并行性? 的相关文章

  • Dataflow 作业完成时通知 Google PubSub

    有没有办法在 Google Dataflow 作业完成后将消息发布到 Google Pubsub 上 我们需要通知依赖系统传入数据的处理已完成 将数据写入到接收器后 Dataflow 如何发布 EDIT 我们希望在管道完成写入 GCS 后发
  • Cloud Dataflow 中的作业失败:启用 Dataflow API

    我目前正在尝试将 Dataflow 与 Pub Sub 结合使用 但收到此错误 工作流程失败 原因 6e74e8516c0638ca 刷新您的凭据时出现问题 请检查 1 为您的项目启用Dataflow API 2 您的项目有一个机器人服务帐
  • 可以使用数据流将 pubsub 消息重复数据删除回 pubsub 吗?

    我有一个将数据写入 Google Cloud pubsub 的应用程序 根据 pubsub 的文档 由于重试机制而导致的重复偶尔可能会发生 还有消息乱序的问题 这在 pubsub 中也得不到保证 另外 根据文档 可以使用 Google Cl
  • 是否可以同时将 Pub/Sub 和 BigQuery 作为 Google Dataflow 中的输入?

    在我的项目中 我希望在 Google Dataflow 中使用流式传输管道来处理 Pub Sub 消息 在清理输入数据时 我还希望获得来自 BigQuery 的侧面输入 这提出了一个问题 将导致两个输入之一无法工作 我在管道选项中设置了st
  • 旁加载静态数据

    在 ParDo 中处理数据时 我需要使用存储在 Google Cloud Storage 上的 JSON 架构 我想这可能是侧面加载 我读了他们称之为文档的页面 https beam apache org releases pydoc 2
  • Apache Beam 中的异步 API 调用

    正如标题所说 我想使用 python 在 apache beam 中进行异步 API 调用 目前 我正在为 Pcollection 中的每个元素调用 DoFn 内的 API 自由度代码 class textapi call beam DoF
  • ParDo 中的侧面输出 | Apache Beam Python SDK

    由于该文档仅适用于 JAVA 我无法真正理解它的含义 它指出 虽然 ParDo 始终生成一个主输出 PCollection 作为 apply 的返回值 但您也可以让 ParDo 生成任意数量的附加输出 PCollection 如果您选择有多
  • Apache Beam:DoFn 与 PTransform

    Both DoFn and PTransform是一种定义操作的方法PCollection 我们如何知道何时使用哪个 理解它的一个简单方法是类比map f 对于列表 高阶函数map将函数应用于列表的每个元素 返回结果的新列表 您可以将其称为
  • 在 Google Data Flow 上的 Spring Boot 项目中运行 Apache Beam 管道

    我正在尝试在 Google Data Flow 上的 Spring Boot 项目中运行 Apache Beam 管道 但我一直遇到此错误Failed to construct instance from factory method Da
  • 从 Dataflow 中的 BigQuery 读取时设置 MaximumBillingTier

    当我从 BigQuery 读取数据作为查询结果时 我正在运行 GCP Dataflow 作业 我正在使用 google cloud dataflow java sdk all 版本 1 9 0 设置管道的代码片段如下所示 PCollecti
  • 压缩保存在Google云存储中的文件

    是否可以压缩已保存在 Google 云存储中的文件 这些文件由 Google 数据流代码创建和填充 数据流无法写入压缩文件 但我的要求是将其保存为压缩格式 标准 TextIO Sink 不支持写入压缩文件 因为从压缩文件中读取的可扩展性较差
  • 将新文件添加到 Cloud Storage 时触发 Dataflow 作业

    我想在将新文件添加到存储桶时触发数据流作业 以便处理新数据并将其添加到 BigQuery 表中 我看到云函数可以被触发 https cloud google com functions calling google cloud storag
  • 在 Apache Beam 中连接行

    我无法理解 Apache Beam 中的连接 例如http www waitingforcode com apache beam joins apache beam read http www waitingforcode com apac
  • bigquery DataFlow 错误:在 EU 中读写时无法在不同位置读写

    我有一个简单的 Google DataFlow 任务 它从 BigQuery 表中读取数据并写入另一个表 如下所示 p beam io Read beam io BigQuerySource query select dia import
  • Apache Beam Pipeline 写表后查询表

    我有一个 Apache Beam Dataflow 管道 它将结果写入 BigQuery 表 然后我想查询该表以获取管道的单独部分 但是 我似乎无法弄清楚如何正确设置此管道依赖性 我编写的新表 然后想要查询 与一个单独的表连接以进行某些过滤
  • 使用谷歌云数据流PubSubIO,消息的读取何时得到确认?

    是否可以延迟确认 直到成功处理子图 PubSubIO Read 下面的所有内容 例如 我们是流媒体从 google pubsub 订阅中读取数据 然后将文件写入 GCS 在另一个分支中 我们使用 BigQueryIO Write 写入 Bi
  • Spring 与 Apache Beam

    我想将 Spring 与 Apache Beam 结合使用 它将在 Google Cloud Data flow Runner 上运行 数据流作业应该能够在执行管道步骤时使用 Spring 运行时应用程序上下文 我想在 Apache Bea
  • 将侧输入应用于 Apache Beam 中的 BigQueryIO.read 操作

    有没有办法将侧面输入应用于 Apache Beam 中的 BigQueryIO read 操作 举例来说 我在 PCollection 中有一个值 我想在查询中使用该值从 BigQuery 表中获取数据 使用侧面输入可以吗 或者在这种情况下
  • detectorClassPathResourcesToStage - 无法转换 url

    当我在 GCE 中运行 jar 时 出现以下错误 java jar mySimple jar project myProjcet Aug 13 2015 1 22 26 AM com google cloud dataflow sdk ru
  • 使用 Apache Beam 的 Dataflow 批量加载的性能问题

    我正在对数据流批量加载进行性能基准测试 发现与 Bigquery 命令行工具上的相同负载相比 加载速度太慢 文件大小约为 20 MB 包含数百万条记录 我尝试了不同的机器类型并获得了最佳的负载性能n1 highmem 4加载目标 BQ 表的

随机推荐

  • 静态方法无法访问实例字段?

    我读过很多关于静态字段的文章 静态方法无法访问实例字段 字段 因为实例字段仅存在于该类型的实例上 但我们可以在静态类中创建和访问实例字段 请找到下面的代码 class Program static void Main string args
  • 通过将集合划分为两个子集来查找可以由集合形成的最大总和

    说明 Given a set of numbers S Find maximum sum such that Sum A1 Sum A2 Where A1 S and A2 S and A1 A2 And Sum X is the sum
  • 在 Android 中向 SQLite 表添加一列?

    我想向现有 SQLite 数据库的表中添加另一列 这可能吗 还是我需要做一些特定的事情来升级它 如果是这样 我该如何去做呢 Use the 修改表命令 ALTER TABLE my table ADD COLUMN new column
  • 从 Google 通讯录获取 Google+ ID

    我正在使用 Google Contacts API 提取用户的联系人电子邮件地址和姓名 有没有办法也获得这些人的 Google ID 联系人 API 将返回profile链接 如果 G 个人资料与联系人条目链接 这是一个例子
  • PostgreSQL 中的级联删除

    我有一个数据库 其中有几十个与外键互连的表 一般情况下我想要默认的ON DELETE RESTRICT这些约束的行为 但是 当尝试与顾问共享数据库快照时 我需要删除一些敏感数据 我希望我的记忆DELETE FROM Table CASCAD
  • Laravel Eloquent - 查询数据透视表

    在我的 Laravel 应用程序中 我有三个数据库表 分别称为用户 项目和角色 它们之间存在 m n 关系 因此我还有名为 project user role 的数据透视表 数据透视表包含 user id project id 和 role
  • 如何在执行 MSTest 测试期间写入 Console.Out

    Context 控制台输出未出现是因为后端代码未在测试上下文中运行 你可能最好使用Trace WriteLine 在 System Diagnostics 中 然后添加写入文件的跟踪侦听器 本主题来自MSDN展示了一种执行此操作的方法 根据
  • 使用 XHR2 请求而不是 cordova-file-transfer 将二进制数据下载到应用程序沙箱中

    Cordova 正在 日落 即将弃用 cordovan plugin file 请参阅他们的博文 Cordova 开发社区不会再对文件传输插件进行更多工作 如果您愿意 您可以继续使用文件传输插件 在可预见的将来它应该可以正常工作 我们强烈建
  • Python虚拟机需要CPU来执行字节码吗?

    Python虚拟机需要CPU来执行字节码吗 字节码是否转换为机器码 然后CPU参与该过程 为了在任何计算机上运行应用程序 其代码必须始终以某种方式转换为机器代码 然后由 CPU 执行 问题在于这种情况何时以及如何发生 让我尝试向您展示 Py
  • 在泽西岛调用 SOAP

    我有一个客户的要求 希望围绕 SOAP Web 服务编写一个包装器 REST Web 服务 我对 SOAP 和 REST 都很陌生 谁能告诉我 我们是否可以在 REST Web 服务中调用 SOAP Web 服务 如果是的话 那么在 Jer
  • javascript 将数字除以小数

    我怎样才能将数字 钱 平均除以x数 该数字可以包含一位或两位小数 也可以不包含小数 such as 1000 or 100 2 or 112 34我希望能够将该数字平等地分成 x 部分 但是如果它不是奇数 则将额外的数字添加到最后一个数字
  • 在现有 SqlConnection 中打开 DbContext 连接

    我感兴趣是否打开实体框架DbContext现有 ADO NET 中的连接SqlConnection如果它们都使用相同的连接字符串 即在完全相同的数据库上操作 那么应该不鼓励吗 例如 using TransactionScope scope
  • 将 jRadioButton 添加到 jTable 中

    我正在尝试添加jRadioButton into jTable 我使用了给定的代码 private class CustomCellRenderer extends DefaultTableCellRenderer non Javadoc
  • 将 CSV 字符串与 IN 运算符一起使用时出错

    当我运行以下代码时 declare aaa nvarchar 10 set aaa 1 2 3 Select from Customer where CustomerId in convert nvarchar aaa 10 我收到此错误
  • 如何在 Django 中创建模型包

    拥有相当大的models py文件 包含多个模型 我正在尝试重构 每个文件一个模型 因此我试图创建一个models包 结构如下 app models init py app models first model py app models
  • 使用jquery取消选中复选框时隐藏文本

    默认情况下会选中复选框 如果未选中 他们应该隐藏文本 如何隐藏或显示 jquery 中的文本 html div class check p p div
  • SQLiteException 没有被捕获

    我试图捕获 android database sqlite SQLiteException 错误代码 5 数据库已锁定 异常 try db insert mytable null myvalues catch SQLiteException
  • 如何在 dplyr 中按降序排列奇数,按升序排列偶数

    我在 r 中有以下数据框 ID bay row number 1 43 11 ABC 2 43 6 DEF 3 43 13 QWE 4 43 15 XDF 5 43 4 VGH 6 43 2 TYU 7 11 11 QAS 8 11 13
  • SQL Server - 不聚合的行到列

    我的数据看起来像这样 address id 12AnyStreet 1234 12AnyStreet 1235 12AnyStreet 1236 12AnyStreet 1237 我的目标是让它看起来像这样 Address id1 id2
  • Cloud Dataflow - Dataflow 如何实现并行性?

    我的问题是 在幕后 对于逐元素 Beam DoFn ParDo 云数据流如何并行工作负载 例如 在我的 ParDO 中 我向外部服务器发送一个针对一个元素的 http 请求 我使用了 30 个工人 每个工人有 4vCPU 这是否意味着每个工