Storm Spout 未收到 Ack

2024-05-24

我已经开始使用storm,所以我使用创建简单的拓扑本教程 https://github.com/nathanmarz/storm/wiki/Tutorial

当我运行我的拓扑时LocalCluster一切看起来都很好, 我的问题是我没有得到元组的确认,这意味着我的喷嘴ack从未被调用过。

我的代码如下 - 你知道为什么吗ack不叫?

所以我的拓扑看起来像这样

public StormTopology build() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(HelloWorldSpout.class.getSimpleName(), 
             helloWorldSpout, spoutParallelism);

        HelloWorldBolt bolt = new  HelloWorldBolt();

        builder.setBolt(HelloWorldBolt.class.getSimpleName(), 
                   bolt, boltParallelism)
              .shuffleGrouping(HelloWorldSpout.class.getSimpleName());
}

我的喷嘴看起来像这样

public class HelloWorldSpout  extends BaseRichSpout implements ISpout {
    private SpoutOutputCollector collector;

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("int"));
    }

    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        this.collector = collector;
    }

    private static Boolean flag = false;
    public void nextTuple() {
        Utils.sleep(5000);

            //emit only 1 tuple - for testing
        if (!flag){
            this.collector.emit(new Values(6));
            flag = true;
        }
    }

    @Override
    public void ack(Object msgId) {
        System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
    }

    public void fail(Object msgId){
        System.out.println("[HelloWorldSpout] fail on msgId" + msgId);
    }
}

我的螺栓看起来像这样

@SuppressWarnings("serial")
public class HelloWorldBolt extends BaseRichBolt{
    private OutputCollector collector;

    public void prepare(Map conf, TopologyContext context, 
                    OutputCollector collector) {
        this.collector = collector;
        logger.info("preparing HelloWorldBolt");
    }

    public void execute(Tuple tuple) {
        System.out.println("[HelloWorldBolt] got" + tuple.getInteger(0));
        this.collector.ack(tuple);
    }

    public void cleanup() {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub

    }
}

spout 中的 emit() 方法只有一个参数,因此该元组没有锚定。这就是为什么即使您在 Bolt 中确认了元组,也没有在 Spout 中回调 ack() 方法。

为了让它工作,你需要修改你的 spout 以发出第二个参数,即消息 id。正是这个 id 被传回 spout 中的 ack() 方法:

public void nextTuple() {
    Utils.sleep(5000);

        //emit only 1 tuple - for testing
    if (!flag){
        Object msgId = "ID 6";  // this can be any object
        this.collector.emit(new Values(6), msgId);
        flag = true;
    }
}


@Override
public void ack(Object msgId) {
    //  msgId should be "ID 6"
    System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Storm Spout 未收到 Ack 的相关文章

随机推荐

  • R 彩色树状图建议?

    我想制作彩色树状图 但尚未找到足够的库 http addictedtor free fr graphiques RGraphGallery php graph 79 http addictedtor free fr graphiques R
  • 我为什么要学习 Lisp? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 我可以关闭并重新打开套接字吗?

    我学习了一个使用套接字的例子 在此示例中 客户端向服务器发送请求以打开套接字 然后服务器 侦听特定端口 打开套接字 一切都很好 套接字从双方 客户端和服务器 打开 但我仍然不清楚这个东西有多灵活 例如 客户端是否可以关闭一个打开的 从两端
  • 在 PHP 上发送不带 SMTP 标头的 SMS

    我正在尝试使用以下对我有用的代码通过 PHP 发送短信验证码 但我越来越 email protected cdn cgi l email protecti
  • 使用MVVM时如何将事件参数作为interaction.Trigger中的参数传递?

    基本上我的自定义类中有一个事件 我将使用事件的参数 gt 属性作为该方法的参数来调用自定义类中的特定方法 您可以观察此信息背后的实际代码 instance FileOpening sender e gt CustomClass Method
  • Meteor JS:存储特定模板实例状态的最佳方法是什么?

    我正在学习 Meteor JS 中的会话和反应式数据源 它们非常适合设置全局 UI 状态 但是 我不知道如何将它们的范围限制到模板的特定实例 这就是我想做的 我的页面上有多个可内容编辑的元素 每个下面都有一个 编辑 按钮 当用户单击 编辑
  • Python:使用for循环更改变量后缀

    我知道这个问题被问了很多 但到目前为止我无法使用 理解答案 我想改变for循环中变量的后缀 我尝试了 stackoverflow 搜索提供的所有答案 但很难理解提问者经常提出的具体代码 因此 为了清楚起见 我使用一个简单的示例 这并不意味着
  • Xamarin.Forms 用相机拍照显示方向错误并且后退按钮崩溃

    我正在使用此处的 Xamarin Forms Camera 示例 https github com XForms Xamarin Forms Labs Samples tree master XF Labs CameraSample htt
  • Nodejs 中的 tail-stream 模块不打印文件的最后一条记录

    我正在使用 tail stream 从 csv 文件获取数据 并将每个 csv 记录转换为 json 格式并打印它 但是尾流不会打印文件的最后一行 而是将其保留为缓冲区 如果我更新文件 则从上一个最后一行 缓冲的最后一行 到更新的最后一行
  • 是否有标准键盘快捷键可以在 Visual Studio 中构建当前项目?

    I know that Ctrl Shift B launches a solution build but I would like a shortcut that just builds the current project Is a
  • 单击保存文件

    我希望能够通过单击下载 csv 文件 而不是在浏览器中打开 我把这段代码 a href file csv download file a 但单击它会在浏览器中打开 v 文件 在本地主机中 当我单击链接时 它正在下载 但在服务器上时 它在浏览
  • perl - 子进程向父进程发送信号

    我编写了以下代码来测试孩子和父母之间的信号传递 理想情况下 当子进程向父进程发出 SIGINT 时 父进程应该在新的迭代中返回并等待用户输入 我在 perl 5 8 中观察到了这一点 但在 perl 5 6 1 我被要求使用 中 父级实际上
  • 链表中的虚拟节点

    问 什么时候使用它们 作业问题 列表中的第一个和最后一个节点 有时用作列表中的第一个和最后一个节点 从未用作列表中的第一个和最后一个节点 维基百科说 哨兵节点是与链接一起使用的专门指定的节点 列表和树作为遍历路径终止符 哨兵节点的作用是 不
  • 交换两个文本框的值

    我有两个文本框值 var pickup txt pickup var destination txt destination 我想交换这两个值 如下所示 pickup val destination val destination val
  • Java .split("|") 不工作

    我刚刚遇到了一个问题分割法 http docs oracle com javase 6 docs api java lang String html split 28java lang String 29for 字符串不适用于字符 作为一个
  • 延迟 HTML5:无效伪类直到第一个事件发生

    我最近发现 invalid伪类适用于required页面加载后立即生成表单元素 例如 如果您有以下代码
  • 云函数 onUpdate:无法读取未定义的属性“forEach”

    现在我正在尝试更新我的项目中的图片 我可以更新云火商店中的图片网址 但我也想使用 firebase 云功能从云存储中删除上一张图片 我想要实现的是 当我上传新图片时 从云存储中删除以前的图片 This is my data structur
  • 处理“未找到细胞”。 Excel 中的错误

    我正在使用 Excel VSTO 应用程序并使用以下代码在工作表中查找错误单元格 Excel Range rngTemp Excel Range rngErrorRange Excel Worksheet Sheet1 Excel Work
  • r - ggplot2 - 突出显示选定的点和奇怪的行为

    我想突出显示选定的点并遇到一些奇怪的行为 首先是一些虚拟数据 a lt 1 50 b lt rnorm 50 mydata lt data frame a a b b ggplot mydata aes x a y b geom point
  • Storm Spout 未收到 Ack

    我已经开始使用storm 所以我使用创建简单的拓扑本教程 https github com nathanmarz storm wiki Tutorial 当我运行我的拓扑时LocalCluster一切看起来都很好 我的问题是我没有得到元组的