Hazelcast Jet 查询

2024-02-05

我对 Hazelcast Jet 有以下疑问

用例如下

有一个应用程序(应用程序“A”,部署在集群中)使用 Hazelcast IMDG 并将数百万条记录/事务放入 hazelcast IMap 中。

已为此 IMap 配置事件日志。

还有另一个应用程序(应用程序 B,部署在集群中)实例化 JetInstance 并在每个节点上单独运行作业来处理记录。

目前,此作业从事件日志中读取数据并添加到 IList 中(参考 - hazelcast-jet-0.5.1\code-samples\streaming\map-journal-source\src\main\java\RemoteMapJournalSource.java)

由于作业在多个节点上运行,事件日志中的记录由多个节点处理。这会导致 IList 中出现多个条目。

是否可以确保一条记录仅由“应用程序 B”的一个节点处理,而不由其他节点处理以避免重复?

如果不是,这是否意味着该作业将由“应用程序 B”集群的单个节点运行?

这是示例代码(应用程序 B)

        Pipeline p = Pipeline.create();
        p.drawFrom(Sources.<Integer, Integer, Integer>remoteMapJournal(MAP_NAME, clientConfig,
                e -> e.getType() == EntryEventType.ADDED, EventJournalMapEvent::getNewValue, true))
         .peek()
         .drainTo(Sinks.list(SINK_NAME));

        JobConfig jc= new JobConfig();
        jc.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);

        localJet.newJob(p,jc);

这是完整的代码。

应用程序源代码。

public class RemoteMapJournalSourceSrv1 {

private static final String MAP_NAME = "map";
private static final String SINK_NAME = "list";

public static void main(String[] args) throws Exception {
    System.setProperty("remoteHz.logging.type", "log4j");

    Config hzConfig = getConfig();
    HazelcastInstance remoteHz = startRemoteHzCluster(hzConfig);

    try {
        IMap<Integer, Integer> map = remoteHz.getMap(MAP_NAME);
        System.out.println("*************** Initial Map  address " +  map.size() );

        while(true) {
            System.out.println("***************map size "+map.size());  
            TimeUnit.SECONDS.sleep(20);
        }

    } finally {
        Hazelcast.shutdownAll();
    }
}

private static HazelcastInstance startRemoteHzCluster(Config config) {
    HazelcastInstance remoteHz = Hazelcast.newHazelcastInstance(config);
    return remoteHz;
}

private static Config getConfig() {
    Config config = new Config();
    // Add an event journal config for map which has custom capacity of 1000 (default 10_000)
    // and time to live seconds as 10 seconds (default 0 which means infinite)
    config.addEventJournalConfig(new EventJournalConfig().setEnabled(true)
                                                         .setMapName(MAP_NAME)
                                                         .setCapacity(10000)
                                                         .setTimeToLiveSeconds(100));
    return config;
}

这是应用程序 B - 节点 1 示例代码

public class RemoteMapJournalSourceCL1 {

private static final String MAP_NAME = "map";
private static final String SINK_NAME = "list";

public static void main(String[] args) throws Exception {
    System.setProperty("remoteHz.logging.type", "log4j");

      JetInstance localJet = startLocalJetCluster();

    try {
        ClientConfig clientConfig = new ClientConfig();
        GroupConfig groupConfig = new GroupConfig();

        clientConfig.getNetworkConfig().addAddress("localhost:5701");
        clientConfig.setGroupConfig(groupConfig);

        IList list1 = localJet.getList(SINK_NAME);

        int size1 = list1.size();

        System.out.println("***************List Initial size "+size1);

        Pipeline p = Pipeline.create();
        p.drawFrom(Sources.<Integer, Integer, Integer>remoteMapJournal(MAP_NAME, clientConfig,
                e -> e.getType() == EntryEventType.ADDED, EventJournalMapEvent::getNewValue, false))
         .peek()
         .drainTo(Sinks.list(SINK_NAME));

        JobConfig jc= new JobConfig();
        jc.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);
        localJet.newJob(p,jc);

        while(true){
            TimeUnit.SECONDS.sleep(10); 
            System.out.println("***************Read " + list1.size() + " entries from remote map journal.");
        }           

    } finally {
        Hazelcast.shutdownAll();
        Jet.shutdownAll();
    }

}

private static String getAddress(HazelcastInstance remoteHz) {
    Address address = remoteHz.getCluster().getLocalMember().getAddress();
    System.out.println("***************Remote address " + address.getHost() + ":" + address.getPort() );
    return address.getHost() + ":" + address.getPort();
}

private static JetInstance startLocalJetCluster() {
      JetInstance localJet = Jet.newJetInstance();
    return localJet;
}

这是应用程序 B - 节点 2 示例代码

public class RemoteMapJournalSourceCL2 {

private static final String MAP_NAME = "map";
private static final String SINK_NAME = "list";

public static void main(String[] args) throws Exception {
    System.setProperty("remoteHz.logging.type", "log4j");

      JetInstance localJet = startLocalJetCluster();

    try {
        ClientConfig clientConfig = new ClientConfig();
        GroupConfig groupConfig = new GroupConfig();

        clientConfig.getNetworkConfig().addAddress("localhost:5701");
        clientConfig.setGroupConfig(groupConfig);

        IList list1 = localJet.getList(SINK_NAME);
        int size1 = list1.size();
        System.out.println("***************List Initial size "+size1);


        Pipeline p = Pipeline.create();
        p.drawFrom(Sources.<Integer, Integer, Integer>remoteMapJournal(MAP_NAME, clientConfig,
                e -> e.getType() == EntryEventType.ADDED, EventJournalMapEvent::getNewValue, true))
         .peek()
         .drainTo(Sinks.list(SINK_NAME));

        JobConfig jc= new JobConfig();
        jc.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);

        localJet.newJob(p,jc);

        while(true){
            TimeUnit.SECONDS.sleep(10); 
            System.out.println("***************Read " + list1.size() + " entries from remote map journal.");
        }           
    } finally {
        Hazelcast.shutdownAll();
        Jet.shutdownAll();
    }

}
private static JetInstance startLocalJetCluster() {
      JetInstance localJet = Jet.newJetInstance();
    return localJet;
}

Hazelcast 客户端 - 将条目放入 Hazelcast 地图(应用程序 A)

public class HZClient {

  public static void main(String[] args) {

    ClientConfig clientConfig = new ClientConfig();
    GroupConfig groupConfig = new GroupConfig();

    clientConfig.getNetworkConfig().addAddress("localhost:5701");
    clientConfig.setGroupConfig(groupConfig);

    HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig);
    IMap<Integer, Integer> map = client.getMap("map");
    Scanner in = new Scanner(System.in);
    int startIndex= 0;
    int endIndex= 0;

    while(true) {
        if(args !=null && args.length > 0 && args[0].equals("BATCH")) {

            System.out.println("Please input the batch size");
            int b = in.nextInt();
            startIndex= endIndex + 1;
            endIndex+= b;
            System.out.println("Batch starts from  "+ startIndex +"ends at"+endIndex);
            putBatch(map,startIndex,endIndex);

        }
        else {
            System.out.println("Please input the map entry");
            int a = in.nextInt();
            System.out.println("You entered integer "+a);
            put(map,a,a);
        }
    }
}

public static void putBatch(IMap map,int startIndex, int endIndex) {
    int index= startIndex;
    System.out.println("Start Index" + startIndex +"End Index"+endIndex );
    while(index<=endIndex){
        System.out.println("Map Values"+ index);
        put(map,index,index);
        index+=1;
    }

}

public static void put(IMap map,int key,int value) {
    map.set(key, value);
}

以下是执行此操作的步骤。

  1. 运行应用程序 A - Java 程序 RemoteMapJournalSourceSrv1

  2. 运行应用程序 B 节点 1 - Java 程序 RemoteMapJournalSourceCL1

  3. 运行应用程序 B 节点 2 - Java 程序 RemoteMapJournalSourceCL2

  4. 运行应用程序 A 的 Hazelcast 客户端 - Java 程序 HZClient

该客户端程序根据控制台输入将条目放入地图中。请提供整数输入。

观察结果

执行时,.peek() 会记录应用程序 B 的两个节点的值,并且在应用程序 A 映射中插入 1 个条目后列表计数变为 2。


您似乎正在从两个 Jet 客户端提交两个独立的作业。每个作业都会接收所有 IMap 事件日志项目并将它们推送到同一个 IList,因此 IList 的预期结果是包含每个项目的两个实例。

请记住,你只submit该作业来自 Jet 客户端,但它实际上在 Jet 集群内部同时在其所有成员上运行。如果您只想接收器中的一份数据副本,请不要提交相同的作业两次。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Hazelcast Jet 查询 的相关文章

随机推荐

  • 在android中使用标签设置和获取片段

    我用 viewpager 创建了一个选项卡布局 一切都很好 除了我需要在特定时刻运行一个方法 所以我需要获取片段实例并运行他们的方法 我是这样创建的 Override protected void onCreate Bundle saved
  • 使用 VsVim 搜索时突出显示会使代码不可读

    我已经使用 VsVim 一段时间了 我对它非常满意 然而 让我烦恼的一件事是当我在 VsVim 中进行任何类型的搜索时 文本的突出显示 例如使用 或标准搜索 我花了几个小时尝试更改 Visual Studios 选项菜单中的字体和颜色 但我
  • QT5:无法在 Windows 中加载 psql 驱动程序

    我想在 Windows 7 中加载 qt5 psql 驱动程序 我已经这样加载库 qDebug lt lt QCoreApplication libraryPaths QString driverName QPSQL QSqlDatabas
  • 移动网站的“链接到 App Store”小部件

    在过去的几个月里 我发现了几个使用某种 小部件 的网站 如果我使用 iPhone 打开具有此小部件的网站 它将显示指向 网站相关 iPhone 应用程序的链接 它看起来像一个简单的 div 包含应用程序名称 评级和链接 该小部件甚至 知道
  • 从 ViewPager 获取不同片段的值

    我正在开发一个应用程序 其中片段是动态生成的 private void setupViewPager ViewPager viewPager adapter new ViewPagerAdapter getSupportFragmentMa
  • Google Sheets:通过 Apps 脚本批量 getRangeByName

    是否可以获得多个按名称范围一通电话 我有一个复杂的函数 需要按名称获取多个范围 而 Spreadsheet getRangeByName name 会显着减慢我的脚本速度 有时这些调用大约需要 2 秒 有时单个调用可能需要大约 45 秒 限
  • svn https: “ra_serf: SSL 通信期间发生错误”

    我知道这与其他帖子的标题相同 但我搜索了又搜索 但找不到解决方案 我在所有机器上从 TortoiseSVN 1 6 升级到 1 8 4 我的主开发机器开始出现此错误 ra serf An error occurred during SSL
  • 具有任意属性的 SPARQL 属性路径查询

    SPARQL 属性路径 http www w3 org TR sparql11 query propertypaths任意长度的查询需要使用特定的属性 我想查询并查找从一个资源开始并以另一个资源结束的任何路径 例如 SELECT p WHE
  • 如何 Iterator::chain 迭代器向量?

    对于给定的一组迭代器 a b c 可以使用以下命令成功链接它们a chain b chain c 由于我尝试编写的 CLI 实用程序提供了路径向量 字符串 dirs a b c d e f 我想使用walkd dir在它们每个上 然后将它们
  • 更改node_modules位置

    有没有办法更改node modules文件夹位置 例如 dir1 dir2 node modules to dir1 dir2 node modules 以下是查看的代码node modules默认文件夹 Module prototype
  • 动作和动作监听器之间的区别

    有什么区别action and actionListener 我应该什么时候使用action versus actionListener 动作监听器 Use actionListener如果你想要一个钩子before真正的业务行动得到执行
  • 使用 Spring Security 进行单元测试

    我的公司一直在评估 Spring MVC 以确定我们是否应该在下一个项目中使用它 到目前为止 我喜欢我所看到的 现在我正在研究 Spring Security 模块 以确定它是否是我们可以 应该使用的东西 我们的安全要求非常基本 用户只需提
  • 如何使用 CodeModel 初始化二维数组

    我需要初始化一个二维数组 如下所示 Object someName param1 param2 param3 param4 param5 param6 我尝试过像 JExpression exp JExpr newArray codeMod
  • 多用户角色环回

    我正在尝试使用 Loopback 作为后端来制作一个应用程序 我以前已经使用过环回 但现在我想做一些我以前从未做过的事情 我想要的很简单 我将有 3 种类型的用户 管理员 服务者和默认用户 但是 我需要限制每种类型用户的访问控制 管理员可以
  • Rust 从 fn 返回结果错误:类型不匹配

    我希望这个函数返回一个错误结果 fn get result gt Result
  • 对图像进行积分的有效方法

    我有一个 2D 数组 典型大小约为 400x100 如图所示 它看起来像一个梯形 因为右下角的元素是 nan 对于数组中的每个元素 我想对多个元素 大约 10 个元素 沿列执行数值积分 在物理语言中 将颜色视为力的大小 我想找到通过计算 F
  • Material Design lite sidenav onhide 仅显示图标

    我正在尝试制作一个侧导航 当切换时 它不会完全隐藏侧导航 并会显示代表每个选项卡的图标 我的代码在这里 header mdl layout drawer border right 0 header mdl layout drawer mdl
  • 脚本通过 CentOS 安装 mysql-server,无需密码提示

    我的操作系统是 CentOS 6 6 我想知道如何通过 shell 脚本自动安装 mysql server 我发现有一个主题讨论了同样的问题 但在 CentOS 6 上失败了 ubuntu安装mysql无密码提示 https stackov
  • 长时间运行任务的视觉反馈

    我有一个长时间运行的 for each 循环 并且想知道是否有一种惯用的方法来添加一些视觉用户反馈 以便用户不会认为应用程序崩溃了 private void btnRunLongRunningTask Click object sender
  • Hazelcast Jet 查询

    我对 Hazelcast Jet 有以下疑问 用例如下 有一个应用程序 应用程序 A 部署在集群中 使用 Hazelcast IMDG 并将数百万条记录 事务放入 hazelcast IMap 中 已为此 IMap 配置事件日志 还有另一个