Java 期货管道

2023-12-31

我正在努力优化我的Future的管理技术。

假设我们有这种典型的处理场景:我运行一个查询以从数据库中获取一些记录:

SELECT * FROM mytable WHERE mycondition;

该查询返回很多我需要处理的行,例如:

while (recordset have more results) {
    MyRow row = recordset.getNextRow(); // Get the next row
    processRow(row);                    // Process the row
}

现在假设所有行彼此独立,并且函数processRow速度很慢,因为它在 C* 集群上执行一些硬处理和查询:

void processRow(MyRow row) {
    // Fetch some useful data from the DB
    int metadataid = row.getMetadataID();
    Metadata metadata = getMetadataFromCassandra(metadataid);

    // .... perform more processing on the row .....

    // Store the processing result in the DB
    ProcessingResult result = ....;
    insertProcessingResultIntoCassandra(result);
}

像这样的串行方法预计性能很差,因此并行执行是有争议的。

考虑到这个基本的处理结构,以下是我对算法进行的一些转换,以获得主要的速度提升。


第 1 步:并行化行处理

这非常简单。我创建了一个Executor得到的jobs并行完成。然后我等待所有工作完成。代码如下:

ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
int failedJobs = 0;
ArrayList<Future<Boolean>> futures = new ArrayList<>();
while (recordset have more results) {
    final MyRow row = recordset.getNextRow(); // Get the next row

    // Create the async job and send it to the executor
    Callable<Boolean> c = new Callable<Boolean>() {
            @Override
            public Boolean call() {
                try {
                    processRow(row);
                } catch (Exception e) {
                    return false; // Job failed
                }
                return true; // Job is OK
            }
    };
    futures.add(executor.submit(c));
}

// All jobs submitted. Wait for the completion.
while (futures.size() > 0) {
    Future<Boolean> future = futures.remove(0);
    Boolean result = false;
    try {
        result = future.get();
    } catch (Exception e) {
        e.printStackTrace();
    }
    failedJobs += (result ? 0 : 1);
}

第 2 步:限制并发行数

到目前为止一切顺利,除非我的数量很少jobs这预计会因内存不足错误而失败,因为执行器由未绑定队列支持,并且主循环将一路提交作业。我可以通过控制并发提交作业的最大数量来解决这个问题:

final const int MAX_JOBS = 1000;
while (recordset have more results) {
    ....
    futures.add(executor.submit(c));
    while (futures.size() >= MAX_JOBS) {
        Future<Boolean> future = futures.remove(0);
        Boolean result = false;
        try {
            result = future.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
        failedJobs += (result ? 0 : 1);
    }
}

简单地说,如果达到某个阈值(在本例中为 1000),我会等待列表中的第一项作业完成。这很有效,而且加速效果很好。


第 3 步:并行化单行处理

这是我想获得一些帮助的步骤。我预计1000由于 IO 缓慢,作业将在队列中快速积累。也就是说,我希望 JVM 能够启动1000线程来容纳所有的工作。现在,当你只有 8 核机器时,1000 个线程通常会减慢一切,我认为通过更多调整并行性,这个数字可以降低。

目前,getMetadataFromCassandra函数是一个包装器session.executeAsync,但管理重试:

public static ResultSet getMetadataFromCassandra(...) {
    int retries = 0;

    // Loop here
    while (retries < MAX_RETRIES) {
        // Execute the query
        ResultSetFuture future = session.executeAsync(statement);
        try {
            // Try to get the result
            return future.get(1000 * (int)Math.pow(2, retries), TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            // Ooops. An error occurred. Cancel the future and schedule it again
            future.cancel(true);
            if (retries == MAX_RETRIES) {
                e.printStackTrace();

                String stackTrace = Throwables.getStackTraceAsString(e);
                logToFile("Failed to execute query. Stack trace: " + stackTrace);
            }

            retries++;
        }
    }

    return null;
}

正如你所看到的,这是一个阻塞函数,因为我.get() on the ResultSetFuture。也就是说,这个调用会阻塞每个等待IO的线程。所以我采用了异步方法,但我觉得我浪费了很多硬件资源。


QUESTION

在我看来,当.executeAsync结果可用(或发生超时),“释放”线程并允许同一线程执行其他操作。

简而言之,在我看来,我需要改变顺序的的结构processRow into a pipeline:查询以异步方式执行,当结果可用时,将执行处理的其余部分。当然,我希望主循环等待整个流水线式的完成的过程,而不仅仅是第一部分。

换句话说,主循环提交一个作业(我们称之为jobJob)我得到一个Future(我们打电话jobFuture)我可以.get()等待其完成。然而,jobJob触发“查询”子作业(我们称之为queryJob), and queryJob是异步提交的,所以我得到另一个Future(我们称之为queryFuture)应该用来触发“进程”子作业(让我们调用processJob)。此时,我只是在嵌套Futures并在完成之前阻止链深处Future代表jobJob,这意味着我又回到了原点!!!

在我采取困难的路线并将这种管道实现为有限状态机之前,我查看了:

  • ForkJoinPool执行者类
  • ListenableFuture来自Guava library
  • CompletableFuture class

它们似乎都不能满足我对这个过程进行流水线处理的要求,或者可能我没有找到关于如何执行如此明显的简单任务的明确解释。谁能简单地告诉我这个话题?

非常感谢任何帮助。


None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Java 期货管道 的相关文章

  • 基于代理的模拟:性能问题:Python vs NetLogo & Repast

    我正在 Python 3 中复制一小段 Sugarscape 代理模拟模型 我发现我的代码的性能比 NetLogo 慢约 3 倍 这可能是我的代码的问题 还是Python的固有限制 显然 这只是代码的一个片段 但 Python 却花费了三分
  • Android:捕获的图像未显示在图库中(媒体扫描仪意图不起作用)

    我遇到以下问题 我正在开发一个应用程序 用户可以在其中拍照 附加到帖子中 并将图片保存到外部存储中 我希望这张照片也显示在图片库中 并且我正在使用媒体扫描仪意图 但它似乎不起作用 我在编写代码时遵循官方的Android开发人员指南 所以我不
  • 未捕获 Func<> 的异常(异步)

    我有以下代码 为了进行此重现而进行了简化 显然 catch 异常块将包含更多逻辑 我有以下代码 void Main var result ExecuteAction async gt Will contain real async code
  • 多个 Maven 配置文件激活多个 Spring 配置文件

    我想在 Maven 中构建一个环境 在其中我想根据哪些 Maven 配置文件处于活动状态来累积激活多个 spring 配置文件 目前我的 pom xml 的相关部分如下所示
  • 加速代码 - 3D 数组

    我正在尝试提高我编写的一些代码的速度 我想知道从 3d 整数数组访问数据的效率如何 我有一个数组 int cube new int 10 10 10 我用价值观填充其中 然后我访问这些值数千次 我想知道 由于理论上所有 3d 数组都存储在内
  • Spark 1.3.1 上的 Apache Phoenix(4.3.1 和 4.4.0-HBase-0.98)ClassNotFoundException

    我正在尝试通过 Spark 连接到 Phoenix 并且在通过 JDBC 驱动程序打开连接时不断收到以下异常 为简洁起见 下面是完整的堆栈跟踪 Caused by java lang ClassNotFoundException org a
  • 路径中 File.separator 和斜杠之间的区别

    使用有什么区别File separator和一个正常的 在 Java 路径字符串中 与双反斜杠相反 平台独立性似乎不是原因 因为两个版本都可以在 Windows 和 Unix 下运行 public class SlashTest Test
  • 十进制到八进制的转换[重复]

    这个问题在这里已经有答案了 可能的重复 十进制转换错误 https stackoverflow com questions 13142977 decimal conversion error 我正在为一个类编写一个程序 并且在计算如何将八进
  • asp.net core / kestrel中的线程管理

    我正在解决我们已迁移到 asp net core 2 0 的 asp net 应用程序的性能 可扩展性问题 我们的应用程序作为应用程序服务托管在 azure 上 并且在任何中等流量的情况下都很容易崩溃 让我困惑的一件事是如何处理多个并发请求
  • 禁止的软件包名称:java

    我尝试从数据库名称为 jaane 用户名 Hello 和密码 hello 获取数据 错误 java lang SecurityException Prohibited package name java at java lang Class
  • Java TestNG 与跨多个测试的数据驱动测试

    我正在电子商务平台中测试一系列商店 每个商店都有一系列属性 我正在考虑对其进行自动化测试 是否有可能有一个数据提供者在整个测试套件中提供数据 而不仅仅是 TestNG 中的测试 我尝试不使用 testNG xml 文件作为机制 因为这些属性
  • getResourceAsStream() 可以找到 jar 文件之外的文件吗?

    我正在开发一个应用程序 该应用程序使用一个加载配置文件的库 InputStream in getClass getResourceAsStream resource 然后我的应用程序打包在一个 jar文件 如果resource是在里面 ja
  • 在 Mac 上正确运行基于 SWT 的跨平台 jar

    我一直致力于一个基于 SWT 的项目 该项目旨在部署为 Java Web Start 从而可以在多个平台上使用 到目前为止 我已经成功解决了由于 SWT 依赖的系统特定库而出现的导出问题 请参阅相关thread https stackove
  • 仅将 char[] 的一部分复制到 String 中

    我有一个数组 char ch 我的问题如下 如何将 ch 2 到 ch 7 的值合并到字符串中 我想在不循环 char 数组的情况下实现这一点 有什么建议么 感谢您花时间回答我的问题 Use new String value offset
  • simpleframework,将空元素反序列化为空字符串而不是 null

    我使用简单框架 http simple sourceforge net http simple sourceforge net 在一个项目中满足我的序列化 反序列化需求 但在处理空 空字符串值时它不能按预期工作 好吧 至少不是我所期望的 如
  • 获取 JVM 上所有引导类的列表?

    有一种方法叫做findBootstrapClass对于一个类加载器 如果它是引导的 则返回一个类 有没有办法找到类已经加载了 您可以尝试首先通过例如获取引导类加载器呼叫 ClassLoader bootstrapLoader ClassLo
  • 静态变量的线程安全

    class ABC implements Runnable private static int a private static int b public void run 我有一个如上所述的 Java 类 我有这个类的多个线程 在里面r
  • 有没有办法为Java的字符集名称添加别名

    我收到一个异常 埋藏在第 3 方库中 消息如下 java io UnsupportedEncodingException BIG 5 我认为发生这种情况是因为 Java 没有定义这个名称java nio charset Charset Ch
  • C# - OutOfMemoryException 在 JSON 文件上保存列表

    我正在尝试保存压力图的流数据 基本上我有一个压力矩阵定义为 double pressureMatrix new double e Data GetLength 0 e Data GetLength 1 基本上 我得到了其中之一pressur
  • 将 List 转换为 JSON

    Hi guys 有人可以帮助我 如何将我的 HQL 查询结果转换为带有对象列表的 JSON 并通过休息服务获取它 这是我的服务方法 它返回查询结果列表 Override public List

随机推荐

  • 属性不应返回数组

    是的 我知道这个问题之前已经讨论过很多次了 我阅读了有关这个问题的所有帖子和评论 但似乎仍然无法理解一些东西 MSDN 提供的解决此违规问题的选项之一是返回收藏 or an 界面这是由一个实现的收藏 在访问该属性时 无论它多么明显并不能解决
  • Flex 默认规则

    如何自定义 Flex 的默认操作 我发现类似 的内容 但当我运行它时 它显示 柔性扫描仪卡住 还有 规则仅添加一条规则 因此它也不起作用 我想要的是 comment comment return 1 default return 0 lt
  • &'a T 是否意味着 T: 'a?

    从我自己的理解和实验来看 这似乎是正确的 但我还没有找到记录它的权威来源 Rust by Example 有一个bounds https doc rust lang org rust by example scope lifetime li
  • Google Play 中不允许使用 com.example 包名称

    我是 Android 应用程序开发新手 我正在创建简单的 Android 应用程序 导出 android 应用程序并将包名称指定为AndroidManifestfile包名为 com example zingyminds apk 现在我得到
  • 使用 FFT 进行高斯图像滤波

    对于图像分割 我使用 OpenCV 的高斯特征差异GaussianBlur 范围从 0 8 到 8 43 指数步长为 1 4 我的图像尺寸为 4096 x 2160 因此这需要相当长的时间 在一个核心上需要 8 秒 这在处理视频时相当长 您
  • 如何更改活动/选定选项卡的颜色?

    我希望当未选择选项卡时颜色为默认灰色 但作为我的颜色tabBarColor选择选项卡时的颜色 我找不到更改标签栏中标题颜色的方法 我怎样才能做到这一点 这是我的代码 Home screen TabNavigator Home screen
  • lub(T1,...Tn) 是什么意思?

    以下引用来自 JLS 14 20 http docs oracle com javase specs jls se8 html jls 14 html jls 14 20 异常参数的声明类型 将其类型表示为 与替代方案 D1 的结合D2 D
  • OpenCV 霍​​夫圆

    我使用 Xcode 和 c 我已经从以下位置复制了 HoughCircles 代码OpenCV 文档 http opencv willowgarage com documentation cpp feature detection html
  • 在 SageMath 中运行时使用 Dask 会抛出 ImportError

    最近 我一直在尝试使用 Dask 并行化一些 Sage 运行 OSX 11 2 3 的 MacBook Pro 上的 Sage 9 4 代码 我遇到的问题是 虽然我可以在 Sage 中运行 Dask 但每当我包含任何非 纯 python 代
  • 我们可以将事件侦听器添加到“Vega-Lite”规范吗?

    我是 Vega 和 Vega Lite 的新手 我正在使用 Vega Lite 创建一个简单的条形图 但我无法添加任何事件侦听器 例如 徘徊 我想将鼠标悬停在一个栏上并更改该栏的颜色 如果您正在使用Vega嵌入 https github c
  • 如何收到图库应用程序可见的每个新图像的通知?

    背景 当用户下载新图像或使用相机捕获图像时 图库应用程序将更新以显示新图像 我需要在创建每个新图像后立即收到通知 无论它是如何创建的 相机 浏览器 就像图库应用程序所示 问题 事实证明 有一个 mediaScanner Android 组件
  • Curl:传输已关闭,剩余未完成的读取数据

    我遇到了大卷曲调用的问题 I get nread 传输已关闭 剩余未完成的读取数据 并且内容已部分交付 GET stats stats breakdown track track campaign search criteria 2 per
  • 无法生成用于构建和调试的资产。 OmniSharp 服务器未运行

    在 Visual Studio VS Code 上 使用 C 进行编码 我正在尝试生成要构建和调试的资产 但收到以下错误消息 无法生成用于构建和调试的资产 OmniSharp 服务器未运行 我在跑 NET版本3 1 301 视窗8 1 Vi
  • 从小表中删除重复行

    我在 PostgreSQL 8 3 8 数据库中有一个表 该表没有键 约束 并且有多行具有完全相同的值 我想删除所有重复项并仅保留每行的 1 个副本 特别有一列 名为 key 可用于识别重复项 即每个不同的 key 应该只存在一个条目 我怎
  • 需要在ggplot2中绘制条形图(以百分位方式)

    嗨 我有一个这样的数据集 ALL Critical Error Warning Review 2016 1412 475 4 125 154 45 49 2 58 116 86 12 1 17 我想使用 ggplot2 绘制堆叠条形图 其中
  • 混合构造函数并在 Javascript 代理对象上应用陷阱

    我有一个类 我想对其应用代理 观察方法调用和构造函数调用 计算器 js class Calc constructor add a b return a b minus a b return a b module exports Calc i
  • 仅查看 Mercurial 中的目录?

    如何仅从 Mercurial 存储库中查看子目录 看来我只能查看整个存储库 你不能 请参阅此处的讨论 https www mercurial scm org wiki PartialClone https www mercurial scm
  • SparkSQL CSV 的引用不明确

    我正在尝试在 SparkSQL 2 10 中读取一堆 CSV 文件 其自定义架构部分是 Double 部分是 String 如下所示 Build the schema val schemaStringS col1 col2 val sche
  • MatPlotLib 修改自定义线性分段颜色图

    关于已接受的答复这个问题 https stackoverflow com questions 38882233 geopandas matplotlib plot custom colors 38885389 38885389 如果我想使用
  • Java 期货管道

    我正在努力优化我的Future的管理技术 假设我们有这种典型的处理场景 我运行一个查询以从数据库中获取一些记录 SELECT FROM mytable WHERE mycondition 该查询返回很多我需要处理的行 例如 while re