Spring Batch - 远程分区

2023-12-20

目前,我们正在将批处理作业从 java 迁移到 Spring Batch。该批处理作业从数据库和 Web 服务获取输入。我们需要在四台服务器上运行此作业以提高性能,因为此作业正在处理大量数据。

上述场景可以通过Spring Batch中的远程分区来实现吗?


我想分享远程分区示例。你可以找到所有来源here https://github.com/gredwhite/spring-batch-remote-execution/tree/master/src/main/java/my/batch/experiments/remote/partitioning

硕士申请:

批量配置:

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
@Import(value = {BrokerConfiguration.class})
public class MasterConfiguration {

    private static final int GRID_SIZE = 3;

    private final JobBuilderFactory jobBuilderFactory;

    private final RemotePartitioningMasterStepBuilderFactory masterStepBuilderFactory;


    public MasterConfiguration(JobBuilderFactory jobBuilderFactory,
                               RemotePartitioningMasterStepBuilderFactory masterStepBuilderFactory) {

        this.jobBuilderFactory = jobBuilderFactory;
        this.masterStepBuilderFactory = masterStepBuilderFactory;
    }

    /*
     * Configure outbound flow (requests going to workers)
     */
    @Bean
    public DirectChannel requests() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
        return IntegrationFlows
                .from(requests())
                .handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
                .get();
    }

    /*
     * Configure inbound flow (replies coming from workers)
     */
    @Bean
    public DirectChannel replies() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
                .channel(replies())
                .get();
    }

    /*
     * Configure the master step
     */
    @Bean
    public Step masterStep() {
        return this.masterStepBuilderFactory.get("masterStep")
                .partitioner("workerStep", new BasicPartitioner())
                .gridSize(GRID_SIZE)
                .outputChannel(requests())
                .inputChannel(replies())
                .build();
    }

    @Bean
    public Job remotePartitioningJob() {
        return this.jobBuilderFactory.get("remotePartitioningJobMy")
                .incrementer(new RunIdIncrementer())
                .start(masterStep())
                .build();
    }

} 

分区器:

public class BasicPartitioner extends SimplePartitioner {

    private static final String PARTITION_KEY = "partition";

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> partitions = super.partition(gridSize);
        int i = 0;
        for (ExecutionContext context : partitions.values()) {
            context.put(PARTITION_KEY, PARTITION_KEY + (i++));
        }
        return partitions;
    }

}  

经纪人配置:

@Configuration
@PropertySource("classpath:remote-partitioning.properties")
public class BrokerConfiguration {

    @Value("${broker.url}")
    private String brokerUrl;

    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(this.brokerUrl);
        connectionFactory.setTrustAllPackages(true);
        return connectionFactory;
    }

}

starter:

@EnableBatchProcessing
@SpringBootApplication
@Import({BasicPartitioner.class, BrokerConfiguration.class})
public class MasterApplication {

    @Value("${broker.url}")
    private String brokerUrl;

    public static void main(String[] args) {
        SpringApplication.run(MasterApplication.class, args);
    }


    @PostConstruct
    public void init() throws Exception {
        BrokerService broker = new BrokerService();
        broker.addConnector(brokerUrl);
        broker.start();
    }
}

从机应用:

config:

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
@Import(value = {BrokerConfiguration.class})
public class WorkerConfiguration {

    private final RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;


    public WorkerConfiguration(RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory) {
        this.workerStepBuilderFactory = workerStepBuilderFactory;
    }

    /*
     * Configure inbound flow (requests coming from the master)
     */
    @Bean
    public DirectChannel requests() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
                .channel(requests())
                .get();
    }

    /*
     * Configure outbound flow (replies going to the master)
     */
    @Bean
    public DirectChannel replies() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
        return IntegrationFlows
                .from(replies())
                .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
                .get();
    }

    /*
     * Configure the worker step
     */
    @Bean
    public Step workerStep() {
        return this.workerStepBuilderFactory.get("workerStep")
                .inputChannel(requests())
                .outputChannel(replies())
                .tasklet(tasklet(null))
                .build();
    }

    @Bean
    @StepScope
    public Tasklet tasklet(@Value("#{stepExecutionContext['partition']}") String partition) {
        return (contribution, chunkContext) -> {
            System.out.println("processing " + partition);
            return RepeatStatus.FINISHED;
        };
    }

} 

starter:

@EnableBatchProcessing
@SpringBootApplication
@Import({BrokerConfiguration.class})

public class WorkerApplication {
    public static void main(String[] args) {
        SpringApplication.run(WorkerApplication.class, args);
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spring Batch - 远程分区 的相关文章

  • 为什么 i++ 不是原子的?

    Why is i Java 中不是原子的 为了更深入地了解 Java 我尝试计算线程中循环的执行频率 所以我用了一个 private static int total 0 在主课中 我有两个线程 主题 1 打印System out prin
  • Java中反射是如何实现的?

    Java 7 语言规范很早就指出 本规范没有详细描述反射 我只是想知道 反射在Java中是如何实现的 我不是问它是如何使用的 我知道可能没有我正在寻找的具体答案 但任何信息将不胜感激 我在 Stackoverflow 上发现了这个 关于 C
  • 如何在 Play java 中创建数据库线程池并使用该池进行数据库查询

    我目前正在使用 play java 并使用默认线程池进行数据库查询 但了解使用数据库线程池进行数据库查询可以使我的系统更加高效 目前我的代码是 import play libs Akka import scala concurrent Ex
  • Java JDBC:更改表

    我希望对此表进行以下修改 添加 状态列 varchar 20 日期列 时间戳 我不确定该怎么做 String createTable Create table aircraft aircraftNumber int airLineCompa
  • 在 HTTPResponse Android 中跟踪重定向

    我需要遵循 HTTPost 给我的重定向 当我发出 HTTP post 并尝试读取响应时 我得到重定向页面 html 我怎样才能解决这个问题 代码 public void parseDoc final HttpParams params n
  • Final字段的线程安全

    假设我有一个 JavaBeanUser这是从另一个线程更新的 如下所示 public class A private final User user public A User user this user user public void
  • JAXb、Hibernate 和 beans

    目前我正在开发一个使用 Spring Web 服务 hibernate 和 JAXb 的项目 1 我已经使用IDE hibernate代码生成 生成了hibernate bean 2 另外 我已经使用maven编译器生成了jaxb bean
  • Spark 1.3.1 上的 Apache Phoenix(4.3.1 和 4.4.0-HBase-0.98)ClassNotFoundException

    我正在尝试通过 Spark 连接到 Phoenix 并且在通过 JDBC 驱动程序打开连接时不断收到以下异常 为简洁起见 下面是完整的堆栈跟踪 Caused by java lang ClassNotFoundException org a
  • 反射找不到对象子类型

    我试图通过使用反射来获取包中的所有类 当我使用具体类的代码 本例中为 A 时 它可以工作并打印子类信息 B 扩展 A 因此它打印 B 信息 但是当我将它与对象类一起使用时 它不起作用 我该如何修复它 这段代码的工作原理 Reflection
  • 如何在PreferenceActivity中添加工具栏

    我已经使用首选项创建了应用程序设置 但我注意到 我的 PreferenceActivity 中没有工具栏 如何将工具栏添加到我的 PreferenceActivity 中 My code 我的 pref xml
  • 为什么HashMap不能保证map的顺序随着时间的推移保持不变

    我在这里阅读有关 Hashmap 和 Hashtable 之间的区别 http javarevisited blogspot sg 2010 10 difference Between hashmap and html http javar
  • AWS 无法从 START_OBJECT 中反序列化 java.lang.String 实例

    我创建了一个 Lambda 函数 我想在 API 网关的帮助下通过 URL 访问它 我已经把一切都设置好了 我还创建了一个application jsonAPI Gateway 中的正文映射模板如下所示 input input params
  • Eclipse Java 远程调试器通过 VPN 速度极慢

    我有时被迫离开办公室工作 这意味着我需要通过 VPN 进入我的实验室 我注意到在这种情况下使用 Eclipse 进行远程调试速度非常慢 速度慢到调试器需要 5 7 分钟才能连接到远程 jvm 连接后 每次单步执行断点 行可能需要 20 30
  • Java执行器服务线程池[关闭]

    很难说出这里问的是什么 这个问题是含糊的 模糊的 不完整的 过于宽泛的或修辞性的 无法以目前的形式得到合理的回答 如需帮助澄清此问题以便重新打开 访问帮助中心 help reopen questions 如果我使用 Executor 框架在
  • 如何从终端运行处理应用程序

    我目前正在使用加工 http processing org对于一个小项目 但是我不喜欢它附带的文本编辑器 我使用 vim 编写所有代码 我找到了 pde 文件的位置 并且我一直在从 vim 中编辑它们 然后重新打开它们并运行它们 重新加载脚
  • Android 中麦克风的后台访问

    是否可以通过 Android 手机上的后台应用程序 服务 持续监控麦克风 我想做的一些想法 不断聆听背景中的声音信号 收到 有趣的 音频信号后 执行一些网络操作 如果前台应用程序需要的话 后台应用程序必须能够智能地放弃对麦克风的访问 除非可
  • 静态变量的线程安全

    class ABC implements Runnable private static int a private static int b public void run 我有一个如上所述的 Java 类 我有这个类的多个线程 在里面r
  • 在 Maven 依赖项中指定 jar 和 test-jar 类型

    我有一个名为 commons 的项目 其中包含运行时和测试的常见内容 在主项目中 我添加了公共资源的依赖项
  • 当我从 Netbeans 创建 Derby 数据库时,它存储在哪里?

    当我从 netbeans 创建 Derby 数据库时 它存储在哪里 如何将它与项目的其余部分合并到一个文件夹中 右键单击Databases gt JavaDB in the Service查看并选择Properties This will
  • 节拍匹配算法

    我最近开始尝试创建一个移动应用程序 iOS Android 它将自动击败比赛 http en wikipedia org wiki Beatmatching http en wikipedia org wiki Beatmatching 两

随机推荐

  • 在我的网站上添加最新推文作为纯文本?

    我有一个 HTML CSS 网站 想要在页脚部分添加我最新的推文 请注意以下几点 我不喜欢他们网站上可用的 Twitter 小部件的外观 我只想将我最新的推文显示为文本 只是最后一条 而不是几条 然后我将添加背景并通过 CSS 自定义字体
  • 如何使用 jasmine 测试带有 DOM 元素的 JavaScript?

    我正在尝试在 jasmine 中为包含 DOM 元素的 JavaScript 代码编写一个测试 但该测试不起作用 当我仅测试 JavaScript 代码 只是简单的函数 但不适用于 DOM 元素 时 它可以工作 我研究了这么久 确实找不到答
  • CollectionView Cell 在水平滚动后向右移动

    我有一个与 CollectionView 大小相同的 collectionView Cell 即一次在屏幕上显示一个 Cell 并且我希望单元格之间的最小间距为 10 问题是当我滚动单元格时 单元格无法正确贴合整个屏幕 并且每次滚动后单元格
  • 将 &T 迭代器收集到 T 集合中的惯用方法是什么?

    我需要收集一个迭代器 strs 进入一个集合 strs 问题是迭代器产生 strs 我尝试从映射 word to word 虽然它有效 但我不知道它是否被认为是好的 或者是否有更好的选择 问题 use std collections Has
  • Cmake:如何使用 cmake 将 rpath 设置为 ${ORIGIN}

    根据这个SO问题 Linux 可执行文件在同一文件夹中找不到共享库 https stackoverflow com questions 39978762 linux executable cant find shared library i
  • 在类定义中定义的 C++ 成员函数中是否隐含“内联”

    根据C 规范 以下两个类的定义是否等效 class A void f class B inline void f 即 将 内联 限定符放在类定义中定义的此类成员函数上是否完全多余 后续问题 假设它是多余的 对于代码风格 保留 内联 标签是否
  • 为什么 localhost 与 127.0.0.1 在会话方面存在差异

    我想知道为什么这两个会话有区别 如果我有一个登录表单 它将会话传递到一个页面 即 settings php 如果我有localhost settings php如果我转到不同的页面并返回 该会话就会起作用 但如果是的话127 0 0 1 s
  • 从批处理文件中逐个启动宏

    在 Firefox 中 使用 Imacros 我想从批处理文件启动多个宏 但问题是 我希望它们一一运行 因此 首先将运行 宏 1 然后在完成后运行 宏 2 依此类推 直到 宏 7 我的批次代码 cd C Program Files Mozi
  • 如何确定 UIWebView 中 iFrame 加载完成

    在我的应用程序中 我需要一种方法来判断我的网络视图何时完成加载 如果内容是 html 则很容易做到这一点 但是 我的内容源是带有 iFrame 的 javascript 这会导致 UIWebView finishLoad 方法被调用多次 无
  • 使用 GCM 在应用程序上发送通知,返回 InvalidRegistration 错误

    我尝试在 Android 设备上使用 GCM 发送通知 但总是收到 InvalidRegistration 错误 以下是应该发送通知的 PHP 代码
  • Xamarin - 如何升级 Android 应用程序并将文件保存在 android 数据文件夹下

    我需要部署使用 Xamarin 制作的新版本 Android 应用程序 我的应用程序将文件存储在以下路径中 android gt 数据 gt APP PACKAGE NAME gt 文件 gt 数据 问题是这些文件在升级后被擦除 并且应用程
  • Rails - 如何接受 JSON 对象数组

    如何在我的 Rails 站点上接受 JSON 对象数组 我发布类似的内容 team name Titans 但是 如果我尝试发布包含对象数组的 JSON 它只保存第一个对象 team name Titans name Dragons nam
  • 无法正确分配内存并且无法在我的 (ft_split) 函数中释放内存

    我正在尝试创建一个ft split函数应该 使用 malloc 分配并返回通过使用字符 c 分割 s 获得的字符串数组 作为分隔符 该数组必须以 NULL 指针结尾 我已将所有代码包含在下面的代码示例中 以及调试器给出错误之一的断点 现在遇
  • 将指针绑定到 C++ 中的成员运算符

    它们有什么意义呢 我从未将它们用于任何用途 而且我根本不认为自己需要使用它们 我是否遗漏了一些关于它们的信息 或者它们几乎毫无用处 编辑 我对它们了解不多 所以可能有必要对它们进行描述 PMF 指向成员函数的指针 类似于普通 静态 函数指针
  • 调用 MS Access 查询,该查询从 Delphi ADO 组件调用模块中的 VBA 函数

    我在 MS Access 模块中创建了一个函数 比方说Calculate A B 我还在 MS Access 中创建了一个查询来使用此函数 比方说 UPDATE aTable SET aField Calculate bField cFie
  • 如何停止忽略子树子目录中的文件模式?

    我正在将 Git 用于我自己的 Unity 项目 我是唯一的开发人员 所以我可以使用破坏性命令 Unity 开发人员可能知道 Unity 为 Assets 文件夹 项目的主文件夹 下的所有内容创建 meta 文件 这些文件 必须 与原始文件
  • 如何在 Asana 中按用户获取任务

    我刚刚开始使用 Asana API 让自己成为一个小工具来概览分配给我的所有任务 我可以做各种各样的事情 比如获取所有工作组 项目 用户 项目任务等 但不知何故 我找不到如何获取每个用户的所有任务 当您登录 Asana 时 您可以概览分配给
  • 释放内存时堆损坏

    我有一堂课如下 struct CliHandler CliHandler int argc char argv CliHandler int doWork int argc char argv private CliHandler cons
  • 在jupyterlab中使用plot.ly - 图形不显示

    我想用 pylot 绘制 3D PCA 散点3d 但是图形没有显示在朱皮特拉实验室只有在jupyter笔记本 我已经安装了 jupyterlab plotly 包 并且能够创建 jupyterlab plotly Plotly 对象 但我无
  • Spring Batch - 远程分区

    目前 我们正在将批处理作业从 java 迁移到 Spring Batch 该批处理作业从数据库和 Web 服务获取输入 我们需要在四台服务器上运行此作业以提高性能 因为此作业正在处理大量数据 上述场景可以通过Spring Batch中的远程