Spring Batch - 并非所有记录都通过 MQ 检索进行处理

2024-01-06

我对 Spring 和 Spring Batch 相当陌生,所以如果您有任何疑问,请随时提出任何澄清问题。

我发现 Spring Batch 存在问题,无法在测试或本地环境中重新创建。我们的日常工作是通过 JMS 连接到 Websphere MQ 并检索一组记录。此作业使用开箱即用的 JMS ItemReader。我们实现了自己的 ItemProcessor,但除了日志记录之外它没有做任何特殊的事情。没有任何过滤器或处理会影响传入记录。

问题是,在 MQ 上的 10,000 多条日常记录中,通常只有大约 700 条左右(每次的确切数字不同)会记录在 ItemProcessor 中。所有记录均已成功从队列中拉出。每次记录的记录数量都不同,并且似乎没有规律。通过将日志文件与 MQ 中的记录列表进行比较,我们可以看到我们的作业正在“处理”看似随机的记录子集。第一条记录可能会被选取,然后跳过 50 条记录,然后连续 5 条记录,依此类推。每次作业运行时,模式都不同。也没有记录任何异常。

在本地主机中运行相同的应用程序并使用相同的数据集进行测试时,ItemProcessor 成功检索并记录了所有 10,000 多条记录。该作业在生产中运行 20 到 40 秒(也不是恒定的),但在测试和本地需要几分钟才能完成(这显然是有道理的,因为它要处理更多的记录)。

因此,这是难以解决的问题之一,因为我们无法重现它。一个想法是实现我们自己的 ItemReader 并添加额外的日志记录,以便我们可以查看记录是否在读取器之前或之后丢失 - 我们现在所知道的是 ItemProcessor 只处理记录的子集。但即使这样也不能解决我们的问题,而且考虑到它甚至不是一个解决方案,实施它也有点及时。

还有其他人见过这样的问题吗?任何可能的想法或故障排除建议将不胜感激。以下是我们用来参考的一些 jar 版本号。

  • 春季 - 3.0.5.RELEASE
  • Spring 集成 - 2.0.3.RELEASE
  • 春季批次 - 2.1.7.RELEASE
  • Activemq-5.4.2
  • Websphere MQ - 7.0.1

预先感谢您的意见。

编辑:根据请求,处理器代码:

public SMSReminderRow process(Message message) throws Exception {

    SMSReminderRow retVal = new SMSReminderRow();
    LOGGER.debug("Converting JMS Message to ClaimNotification");
    ClaimNotification notification = createClaimNotificationFromMessage(message);

    retVal.setShortCode(BatchCommonUtils
            .parseShortCodeFromCorpEntCode(notification.getCorpEntCode()));
    retVal.setUuid(UUID.randomUUID().toString());
    retVal.setPhoneNumber(notification.getPhoneNumber());
    retVal.setMessageType(EventCode.SMS_CLAIMS_NOTIFY.toString());

    DCRContent content = tsContentHelper.getTSContent(Calendar
            .getInstance().getTime(),
            BatchCommonConstants.TS_TAG_CLAIMS_NOTIFY,
            BatchCommonConstants.TS_TAG_SMSTEXT_TYP);

    String claimsNotificationMessage = formatMessageToSend(content.getContent(),
            notification.getCorpEntCode());

    retVal.setMessageToSend(claimsNotificationMessage);
    retVal.setDateTimeToSend(TimeUtils
            .getGMTDateTimeStringForDate(new Date()));

    LOGGER.debug(
            "Finished processing claim notification for {}. Writing row to file.",
            notification.getPhoneNumber());
    return retVal;
}

JMS配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
<bean id="claimsQueueConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiName" value="jms/SMSClaimNotificationCF" />
    <property name="lookupOnStartup" value="true" />
    <property name="cache" value="true" />
    <property name="proxyInterface" value="javax.jms.ConnectionFactory" />
</bean>

<bean id="jmsDestinationResolver"
    class="org.springframework.jms.support.destination.DynamicDestinationResolver">
</bean>

<bean id="jmsJndiDestResolver" 
    class=" org.springframework.jms.support.destination.JndiDestinationResolver"/>  

<bean id="claimsJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="claimsQueueConnectionFactory" />
    <property name="defaultDestinationName" value="jms/SMSClaimNotificationQueue" />
    <property name="destinationResolver" ref="jmsJndiDestResolver" />
    <property name="pubSubDomain">
        <value>false</value>
    </property>
    <property name="receiveTimeout">
        <value>20000</value>
    </property>
</bean>

一般来说,正确配置后,MQ 不会丢失消息。那么问题是“正确配置”是什么样子的?

通常,丢失消息是由非持久性或非事务性 GET 引起的。

如果非持久消息正在遍历 QMgr 到 QMgr 通道并且NPMSPEED(FAST)设置后,如果错误丢失,MQ 将不会记录错误。这就是这些选项的用途,因此不会出现错误。

修复:设置NPMSPEED(NORMAL)在 QMgr 到 QMgr 通道上或使消息持久化。

如果客户端在同步点之外获取消息,则消息可能会丢失。这与 MQ 无关,这只是消息传递的一般工作方式。如果您告诉 MQ 破坏性地从队列中获取一条消息,并且它无法将该消息传送到远程应用程序,那么 MQ 回滚该消息的唯一方法是在同步点下检索该消息。

修复:使用事务处理会话。

还有一些根据经验而产生的附加注释。

  • 每个人都发誓消息持久性设置为他们认为的那样。但是当我停止应用程序并手动检查消息时very经常是not预期是什么。很容易验证,所以不要假设。
  • 如果消息在队列上回滚,则直到 MQ 或 TCP 使孤立通道超时才会发生这种情况。这可能长达 2 小时,因此请调整通道参数和 TCP Keepalive 以减少该时间。
  • 检查 MQ 的错误日志(QMgr 而非客户端的错误日志)以查找有关事务回滚的消息。
  • 如果您仍然无法确定消息的去向,请尝试使用支持Pac MA0W http://ibm.co/SupptPacMA0W。该跟踪作为出口运行,它是极其可配置。你可以追踪所有GET仅对单个队列进行操作。输出是人类可读的形式。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spring Batch - 并非所有记录都通过 MQ 检索进行处理 的相关文章

  • 将一种类型的对象声明为另一种类型的实例有什么好处? [复制]

    这个问题在这里已经有答案了 可能的重复 Base b2 new Child 是什么意思 表示 https stackoverflow com questions 4447924 what does base b2 new child sig
  • 如何配置 Spring-WS 以使用 JAXB Marshaller?

    感谢您到目前为止对此的帮助 我正在更新问题 因为我没有显示我需要的所有内容 并显示了建议的更改 肥皂输出仍然不是我想要的 servlet xml
  • 如何在Java中优雅地处理SIGKILL信号

    当程序收到终止信号时如何处理清理 例如 我连接到一个应用程序 希望任何第三方应用程序 我的应用程序 发送finish注销时的命令 发送该信息最好说什么finish当我的应用程序被破坏时的命令kill 9 编辑1 kill 9无法被捕获 谢谢
  • 项目缺少所需的注释处理库

    我的 Eclipse IDE 突然在问题视图中显示 xxxx 项目缺少所需的注释处理库 xxxx M2 REPO 中的一些旧 jar 我用谷歌搜索 没有找到任何答案 为什么我的项目使用旧的 jar 以及错误来自哪里 To remove th
  • 具有 CRUD 功能的基于 Spring Web 的管理工具

    在 PHP Symfony 世界里有一个工具叫 Sonata Adminhttps sonata project org https sonata project org 基于 AdminLTE 模板 这是一款一体化管理工具 具有登录 菜单
  • Kafka Java Consumer 已关闭

    我刚刚开始使用卡夫卡 我面临着消费者的一个小问题 我用Java写了一个消费者 我收到此异常 IllegalStateException 此消费者已关闭 我在以下行中遇到异常 ConsumerRecords
  • 正则表达式在 Velocity 模板中不起作用

    我在 Test java 中尝试过这个 String regex lt s br s s gt String test1 lt br gt System out println test replaceAll regex 但是当我在速度模板
  • 从 HttpClient 3 转换为 4

    我已经成功地对所有内容进行了更改 但以下内容除外 HttpClient client HttpPost method client new DefaultHttpClient method new HttpPost url InputStr
  • 如何配置Spring boot分页从第1页开始,而不是从0开始

    boot 1 4 0 可分页 用于分页 它工作正常 没有任何问题 但默认情况下 页面值从 0 开始 但在前端 页面值从 1 开始 那么是否有任何标准方法来增加值而不是手动增加代码内的页码 public Page
  • Cucumber DataTable 错误 - io.cucumber.datatable.UndefinedDataTableTypeException:无法将 DataTable 转换为 cucumber.api.DataTable

    尝试使用 cucumber selenium java intelliJ 运行场景 但在其中一个步骤中出现有关 DataTable 的错误 在我开始使用测试运行程序并更改周围的一些内容之前 数据表工作正常并正确转换该步骤的参数 但我就是无法
  • 如何使用 swagger-codegen-plugin (maven) 生成客户端代码?

    我需要使用 swagger codegen plugin for maven 在 eclipse 中生成服务器存根代码 你能帮忙怎么做吗 以及需要什么配置 在 pom xml 中 我找到了这个答案 您只需要像下面这样更改 pom xml 即
  • 从 Android 访问云存储

    我一直无法找到任何有关如何从 Android 应用程序使用云存储的具体文档 我确实遇到过这个客户端库 https cloud google com storage docs reference libraries然而 Google Clou
  • JAXB 编组器无参数默认构造函数

    我想从 java 库中编组一个 java 对象 当使用 JAXB marschaller 编组 java 对象时 我遇到了一个问题 A 类没有无参数默认构造函数 我使用Java Decompiler来检查类的实现 它是这样的 public
  • 在循环中按名称访问变量

    我正在开发一个 Android 项目 并且有很多可绘制对象 这些绘图的名称都类似于icon 0 png icon 1 png icon 100 png 我想将这些可绘制对象的所有资源 ID 添加到整数 ArrayList 中 对于那些不了解
  • Java和手动执行finalize

    如果我打电话finalize 在我的程序代码中的一个对象上 JVM当垃圾收集器处理这个对象时仍然再次运行该方法吗 这是一个大概的例子 MyObject m new MyObject m finalize m null System gc 是
  • JPA 将 BigDecimal 作为整数保存在数据库中

    我在数据库中有这个字段 ITEMCOST NUMERIC 13 DEFAULT 0 NOT NULL 在JAVA中 Entity中的字段定义如下 Column name ITEMCOST private BigDecimal itemCos
  • CXF:通过 SOAP 发送对象时如何排除某些属性?

    我使用 Apache CXF 2 4 2 当我将数据库中的某个对象返回给用户时 我想排除一些属性 例如密码 我怎样才能做到这一点无需创建临时的班级 有这方面的注释吗 根据 tomasz nurkiewicz 评论我应该使用 XmlTrans
  • 如何清理 Runtime.exec() 中使用的用户输入?

    我需要通过命令行调用自定义脚本 这些脚本需要很少的参数并在 Linux 机器上调用 当前版本容易出现各种shell注入 如何清理用户给出的参数 参数包括登录名和路径 Unix 或 Windows 路径 用户应该能够输入任何可能的路径 该路径
  • 使用自定义比较器在 Java 中创建 SortedMap

    我想创建一个TreeMap在 Java 中具有自定义排序顺序 排序后的键是字符串 需要根据第二个字符进行排序 这些值也是字符串 示例地图 Za FOO Ab Bar 您可以像这样使用自定义比较器 Comparator
  • 如何使用注释处理 Hibernate 和 Spring 中的连接查询?

    我正在使用 Spring 和 Hibernate 以及 MySQL 开发应用程序 我是 Hibernate 新手 完成了基本任务 现在我需要在选择查询中应用联接以使用注释从多个表中获取数据 我已经搜索过但仍然没有任何想法 这是我的数据库表和

随机推荐

  • 使用 pdfmake 在段落周围添加边框

    我正在通过生成pdfpdfmake http bpampuch github io pdfmake 假设我有这样的 pdf 内容 var docDefinition content Lorem ipsum dolor sit amet co
  • Python 中的字符串是池化的吗?

    Python 是否有一个所有字符串的池 并且它们 字符串 是单例吗 更准确地说 在下面的代码中 是在内存中创建了一个还是两个字符串 a str num b str num 字符串在 Python 中是不可变的 因此实现可以决定是否实习 这是
  • heroku 上的 Hapi 服务器无法绑定端口

    我正在为 ReactJS 应用程序开发 Hapi 服务器 但当我尝试部署到 Heroku 时 收到 R10 错误 无法在启动后 60 秒内绑定到 PORT 到底是怎么回事 我正在使用 process env PORT 我也尝试过 parse
  • Web 应用程序安全的好指南? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我非常擅长制作网络应用程序 并且知道如何在客户端 服务器等之间传输数据 尽管学习如何使数据交换更安全 但我需要一些帮助 这就是为什么我有点害怕发
  • 如何在 C++ 中存储变量数据

    我正在创建一个存储有关特定数据源的元数据的类 元数据采用树状结构 与 XML 的结构非常相似 元数据值可以是整数 小数或字符串值 我很好奇 C 中是否有一种好方法来存储这种情况的变体数据 我希望变体使用标准库 因此我避免使用可用的 COM
  • Numba:如何抑制

    我的 numba 代码中不断出现此错误 Warning 101 0 Unused argument self 我的 numba 代码如下 如何抑制错误消息 autojit def initialise output data self in
  • 使用 $switch 将分数添加到 MongoDB 聚合

    我正在尝试根据哪个值向我的 mongodb 聚合添加一个分数name字段匹配 例如 如果name与 sitt 完全匹配 得分为100 如果name匹配 sitt i 得分为 50 这是我的代码 db getCollection tags a
  • 构造函数参数的求值顺序[重复]

    这个问题在这里已经有答案了 假设我有这门课 struct A A int int int 我这样初始化它 A a b c 功能在哪里a b and c 全部返回int 应该a 之前被调用b and b before c 我对标准中的以下段落
  • Three.js - 获取鼠标单击的 X、Y 和 Z 坐标

    我在用着版本68三个 js 我想单击某处并获取 X Y 和 Z 坐标 我按照此处的步骤操作 但它们给我的 Z 值为 0 鼠标 画布 X Y 到 Three js 世界 X Y Z https stackoverflow com questi
  • tweepy OAuthHandler 错误

    我是新来的 对 python 没有经验 如果问题很微不足道 很抱歉 我有这个简单的脚本 用于获取给定 Twitter 用户的关注者 import time import tweepy consumer key xxx consumer se
  • 为什么我们需要虚拟表?

    我正在寻找一些有关虚拟表的信息 但找不到任何易于理解的内容 有人能给我一些很好的例子和解释吗 如果没有虚拟表 您将无法使运行时多态性发挥作用 因为对函数的所有引用都将在编译时绑定 一个简单的例子 struct Base virtual vo
  • Ajax 调用以 Base64 字符串形式返回 PDF 文件

    我在 Angular JS 环境中使用 ajax 来调用本地文件 pdf 文件 调用成功 但是ajax调用返回的数据是乱码 不确定我这里使用的术语是否正确 但就像使用文本编辑器打开pdf文件一样 无论如何 我可以得到 base64 字符串的
  • 使用Retrofit 2.0和RxJava获取响应状态代码

    我正在尝试升级到 Retrofit 2 0 并在我的 android 项目中添加 RxJava 我正在进行 api 调用 并希望在服务器出现错误响应时检索错误代码 Observable
  • 初始化列表与初始化方法

    在 C 中至少有两种初始化类的方法 1 初始化列表 struct C int i C i 0 2 初始化方法 struct D int i C init void init i 0 我需要时不时地重新初始化我的类的对象 使用第二种解决方案
  • C:为什么 LLVM 从左到右计算 printf,而 GCC 从右到左计算?

    正如这个问题中所述 LLVM和GCC 不同输出相同的代码 https stackoverflow com questions 15929795 llvm and gcc different output same code LLVM 和 G
  • Android - 从另一个活动完成活动

    有什么办法 如何从堆栈中完成某些活动 我有服务 它会查找更新 当找到更新时 它会打开更新活动 其中会出现安装提示 但是安装出现后我想完成更新活动 因为没有必要仍然在堆栈上 Thanks 如果更新活动正在启动另一个安装活动 那么您可能需要覆盖
  • 反应式的“缓冲直到安静”行为?

    我的问题有点像内格尔算法 https en wikipedia org wiki Nagle 27s algorithm是为了解决问题而创建的 但不完全是为了解决问题 我想要的是缓冲OnNext通知来自IObservable
  • 创建具有相同键和值的对象,而不重复数据

    在ES6中 我们可以这样做 const key foo const myObj key myObj gt foo foo So key 相当于 key key 但是 我们怎样才能创建同一个对象而不需要key多变的 我想要有类似的东西 foo
  • 使用应用程序期间可达性连接丢失时弹出警报(IOS xcode swift)

    我是 IOS 应用程序开发的初学者 希望 在使用应用程序期间失去可达性连接时弹出警报 IOS xcode swift 但我只在启动我的应用程序时收到弹出警报 当互联网连接丢失时 使用我的应用程序时不会弹出警报 请各位好心人帮忙 谢谢 我做了
  • Spring Batch - 并非所有记录都通过 MQ 检索进行处理

    我对 Spring 和 Spring Batch 相当陌生 所以如果您有任何疑问 请随时提出任何澄清问题 我发现 Spring Batch 存在问题 无法在测试或本地环境中重新创建 我们的日常工作是通过 JMS 连接到 Websphere