storm集成kafka简单使用示例2

2023-11-20

StormKafkaTopo.java

package stormUse.stormUse;

import java.util.Properties;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;

public class StormKafkaTopo 
{

    public static void main(String[] args) throws Exception 
    { 
        // 配置Zookeeper地址
        BrokerHosts brokerHosts = new ZkHosts("192.168.153.233:2181");
        // 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "test", "/test" , "kafkaspout");

        // 配置KafkaBolt中的kafka.broker.properties
        Config conf = new Config();  

        //set producer properties.
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.153.233:9092");
        props.put("acks", "1");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaBolt<String, String> bolt = new KafkaBolt<String, String>()
                .withProducerProperties(props)
                .withTopicSelector(new DefaultTopicSelector("topic2"))
                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<String, String>());


        spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());  
        TopologyBuilder builder = new TopologyBuilder();   
        builder.setSpout("spout", new KafkaSpout(spoutConfig));  
        builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout"); 
        //builder.setBolt("kafkabolt", new KafkaBolt<String, Integer>()).shuffleGrouping("bolt");
        builder.setBolt("kafkabolt", bolt).shuffleGrouping("bolt");

        if (args != null && args.length > 0) 
        {  
            conf.setNumWorkers(3);  
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());  
        } else 
        {  
            LocalCluster cluster = new LocalCluster();  
            cluster.submitTopology("Topo", conf, builder.createTopology());  
            Utils.sleep(100000);  
            cluster.killTopology("Topo");  
            cluster.shutdown();  
        }  
    }
}

SenqueceBolt.java

package stormUse.stormUse;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class SenqueceBolt extends BaseBasicBolt
{

    public void execute(Tuple input, BasicOutputCollector collector) 
    {
        // TODO Auto-generated method stub
         String word = (String) input.getValue(0);  
         String out = "I'm " + word +  "!";  
         System.out.println("out=" + out);
         collector.emit(new Values(out));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) 
    {
        declarer.declare(new Fields("message"));
    }
}

MessageScheme.java

package stormUse.stormUse;

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.List;

import org.apache.storm.spout.Scheme;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class MessageScheme implements Scheme {

     public List<Object> deserialize(ByteBuffer ser) 
     {

         Charset charset = null;  
         CharsetDecoder decoder = null;  
         CharBuffer charBuffer = null;  

         try 
         {
             charset = Charset.forName("UTF-8");  
             decoder = charset.newDecoder();  
             charBuffer = decoder.decode(ser.asReadOnlyBuffer());  
             String msg = charBuffer.toString(); 
             return new Values(msg);

         } catch (CharacterCodingException e) 
         {  

          }
            return null;
     }

     public Fields getOutputFields() {
            return new Fields("msg");  
        }

}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

storm集成kafka简单使用示例2 的相关文章

  • Spark 1.0.2(以及 1.1.0)挂在分区上

    我在 apache Spark 中遇到了一个奇怪的问题 希望得到一些帮助 从 hdfs 读取数据 并进行一些从 json 到对象的转换 后 下一阶段 处理所述对象 在处理 2 个分区 总共 512 个分区 后失败 这种情况发生在大型数据集上
  • 外部混洗:从内存中混洗大量数据

    我正在寻找一种方法来整理内存不适合的大量数据 大约 40GB 我有大约 3000 万个可变长度的条目 存储在一个大文件中 我知道该文件中每个条目的开始和结束位置 我需要对内存中不适合的数据进行洗牌 我想到的唯一解决方案是对包含以下数字的数组
  • 为什么布尔字段在 Hive 中不起作用?

    我的配置单元表中有一个数据类型为布尔值的列 当我尝试从 csv 导入数据时 它存储为 NULL 这是我的示例表 CREATE tABLE if not exists Engineanalysis EngineModel String Eng
  • Hive 执行钩子

    我需要在 Apache Hive 中挂钩自定义执行挂钩 如果有人知道该怎么做 请告诉我 我当前使用的环境如下 Hadoop Cloudera 版本 4 1 2 操作系统 Centos 谢谢 阿伦 有多种类型的挂钩 具体取决于您要在哪个阶段注
  • R bigglasso 结果与 hdm 或 glmnet 不匹配

    我一直在尝试使用 R 包 biglasso 来处理高维数据 但是 我得到的结果与我从 hdm 或 glmnet 获得的 LASSO 函数的结果不匹配 biglasso 的文档也很差 在下面的示例中 hdm 和 glmnet 的结果非常接近
  • 如何使用 Pig 从列中解析 JSON 字符串

    我有 tsv 日志文件 其中一列由 json 字符串填充 我想用以下内容解析该列JsonLoader in a Pig脚本 我看到很多例子JsonLoader用于每行只有一个 json 字符串的情况 我还有其他专栏想要跳过 但我不知道该怎么
  • 在 MATLAB 中处理大型 CSV 文件

    我必须处理一个最大 2GB 的大 CSV 文件 更具体地说 我必须将所有这些数据上传到 mySQL 数据库 但在我必须对此进行一些计算之前 所以我需要在 MATLAB 中完成所有这些操作 我的主管也想在 MATLAB 中完成 因为他熟悉MA
  • 如何从 HIVE 中的日期减去月份

    我正在寻找一种方法来帮助我从 HIVE 中的日期中减去月份 我有个约会2015 02 01 现在我需要从这个日期减去 2 个月 这样结果应该是2014 12 01 你们能帮我一下吗 select add months 2015 02 01
  • 使用 clojure-csv.core 解析巨大的 csv 文件

    到目前为止我有 require clojure csv core as csv require clojure java io as io def csv file getFile clojure java io resource verb
  • 分段读取 CSV 文件的策略?

    我的计算机上有一个中等大小的文件 4GB CSV 但没有足够的 RAM 来读取该文件 64 位 Windows 上为 8GB 在过去 我只是将其加载到集群节点上并将其读入 但我的新集群似乎任意将进程限制为 4GB RAM 尽管每台机器的硬件
  • Flink 中的水印和触发器有什么区别?

    我读到 排序运算符必须缓冲它接收到的所有元素 然后 当它接收到水印时 它可以对时间戳低于水印的所有元素进行排序 并按排序顺序发出它们 这是正确 因为水印表明不能有更多元素到达并与已排序元素混合 https cwiki apache org
  • 将 pandas 数据框中的行和上一行与数百万行进行比较的最快方法

    我正在寻找解决方案来加速我编写的函数 以循环遍历 pandas 数据帧并比较当前行和前一行之间的列值 例如 这是我的问题的简化版本 User Time Col1 newcol1 newcol2 newcol3 newcol4 0 1 6 c
  • 在nodejs中写入文件之前对数据流进行排序

    我有一个输入文件 可能包含最多 1M 条记录 每条记录如下所示 field 1 field 2 field3 n 我想读取这个输入文件并根据field3在将其写入另一个文件之前 这是我到目前为止所拥有的 var fs require fs
  • 如何在 Elasticsearch 中或在 Lucene 级别进行联接

    在 Elasticsearch 中执行相当于 SQL 连接的最佳方法是什么 我有一个包含两个大表的 SQL 设置 Persons 和 Items 一个人可以拥有many项目 人员和项目行都可以更改 即更新 我必须运行根据人和物品的各个方面进
  • 在 Spark 中,广播是如何工作的?

    这是一个非常简单的问题 在 Spark 中 broadcast可用于有效地将变量发送给执行器 这是如何运作的 更确切地说 何时发送值 我一打电话就发送broadcast 或者何时使用这些值 数据到底发送到哪里 发送给所有执行者 还是只发送给
  • 是否可以在表之间创建关系?

    Bigquery 看起来很棒 我有一个数据库类型 ETL 其中我的方案在实体之间有多种关系 我想知道是否有办法在它们之间建立关系 或者是否可以在数据集之间以某种方式模拟它们 请原谅我的英语 这不是我的语言 而且我不太了解它 您无法在 Big
  • 使用 big.matrix 对象计算欧几里德距离矩阵

    我有一个类对象big matrix in R有尺寸778844 x 2 这些值都是整数 公里 我的目标是使用以下公式计算欧几里德距离矩阵big matrix并因此得到一个类的对象big matrix 我想知道是否有最佳方法可以做到这一点 我
  • Hive ParseException - 无法识别“结束”“字符串”附近的输入

    尝试从现有 DynamoDB 表创建 Hive 表时出现以下错误 NoViableAltException 88 at org apache hadoop hive ql parse HiveParser IdentifiersParser
  • CSS3变换:翻译最大值?

    我创建了一个实验无限滚动 Pi 的前十亿位 https daniellamb com experiments infinite pi 寻找 创建一个具有大量数据集的高性能滚动解决方案 我开始测试iScroll http iscrolljs
  • 使用 awk 处理多个文件

    我必须使用 awk 处理大量 txt 文件 每个文件 1600 万行 我必须阅读例如十个文件 File 1 en sample 1 200 en n sample 2 10 en sample 3 10 File 2 en sample 1

随机推荐

  • 为什么越来越多的 IT 人考软考?

    近几年随着国家计算机与软件技术的发展 每年报名参加软考考试的人也越来越多 据工信部新闻发布会消息 计算机软件与通信专业技术人员职业资格考试累计报考人数超过485万 2022年报考人数129万人 01 为什么越来越多的IT人考软考证书 1 软
  • 【精品示例】超实用Python爬虫入门实例——做一个优质舔狗

    引言 最近发现了一个有意思的网站 里面充斥了大量的舔狗箴言 作为一个爬虫发烧友怎么能错过此等机会 咱们直接就是上才艺 类的编写 本次爬虫使用了多协程的方案进行 保证了爬虫的速度 在这里我们新建一个爬虫类 并在里边添加上我们需要的方法 网页的
  • IDEA打包上传到阿里云私服

    上传阿里云私服报错 ERROR Failed to execute goal org apache maven plugins maven deploy plugin 2 8 2 deploy default deploy on proje
  • 通讯录系统图形化界面(C++,Qt5.12)(Visual Studio2019,QtCreator)(初学)

    目录 无用的前言 无用的话 无需用看 前言 一 开发工具 二 功能演示以及 源码和安装包 下载 三 功能介绍以及设计思路 四 代码具体实现 项目文件结构 main cpp mainwindow ui mainwindow h mainwin
  • 2.前端笔记-CSS-字体属性

    1 字体系列 CSS使用font family属性定义文本的字体系列 body font family 思源黑体 Microsoft YaHei 建议 使用英文写字体的属性值 尽量使用系统默认自带字体 保证在任何用户的浏览器都可以显示 微软
  • react 入坑学习(十四)混合菜单新模式(ANT ProLayout)

    混合菜单新模式 样例 Ant Design Pro Blog 文档 这个明显就比非混合的好看很多 今天就来试试改一改吧 现在官网中找到ProLayout 就可以找到这个混合模式的源码样例 import React from react im
  • css实现文本超出显示省略号

    一 普通情况下 1 固定width 2 overflow hidden 3 text overflow ellipsis 显示为省略号 4 white space nowrap 不换行 二 table表格里 td 设置上面的4步 table
  • Selenium 之订制启动Chrome的选项(Options)

    使用 selenium 时 我们可能需要对 chrome 做一些特殊的设置 以完成我们期望的浏览器行为 比如阻止图片加载 阻止JavaScript执行 等动作 这些需要 selenium的 ChromeOptions 来帮助我们完成 1 什
  • 3.Open3D教程——点云数据操作

    点云数据 本教程阐述了基本的点云用法 随需要的文件链接 1 显示点云 import open3d as o3d import numpy as np print Load a ply point cloud print it and ren
  • ESDA in PySal (3):Geosilhouettes:集群拟合的地理测量

    ESDA in PySal 3 Geosilhouettes 集群拟合的地理测量 Silhouette statistics Rousseeuw 1987 是观测值与给定聚类的拟合优度的非参数度量 在聚类具有 地理 解释的情况下 例如当它们
  • 【Linux】进程优先级,环境变量,进程地址空间

    文章目录 1 进程优先级 基本概念 查看系统进程 PRI and NI PRI vs NI 修改进程优先级的命令 其他概念 2 环境变量 基本概念 查看环境变量方法 常见环境变量 测试PATH 环境变量相关的命令 环境变量的组织方式 通过代
  • 心理学的166个现象---之六

    101 拍球效应 拍篮球时 用的力越大 篮球就跳得越高 对学生的期望值越高 学生潜能的发挥就越充分 优秀的老师总是尽可能地信任学生 不断鼓励学生 而批评则尽可能委婉 不使矛盾激化 102 旁观者效应 1993年 四川达竹矿务局一名高考超过录
  • pytorch模型训练的若干问题

    1 Net input 调用的是什么函数 为什么直接写对象名就直接调用函数了 net是创建的vgg类的对象 vgg类继承于pytorch库中类nn Module 创建类时的括号里写上父类的名字 就是继承的意思 在pytorch库中nn Mo
  • QTableWidget 设置表头颜色

    QTableWidget 设置表头颜色 方法1 setStyleSheet QHeaderView section background color qlineargradient x1 0 y1 0 x2 0 y2 1 stop 0 00
  • android sdk自带的fragment标签使用

    项目开发中要用到 下面四个大分类 上面三个小分类的情况 大分类采用viewPage 小分类 使用了sdk自带的
  • 制造业软件体系结构与互联网的差异

    本人自毕业已经13年 虽然热爱计算机 但是由于种种原因 一直在东莞的工厂混迹 感受着互联网的大潮 也不免有几分失落 伴随这去年 今年大厂裁人 许多被逼无路的程序员开始跳槽制造业 浓浓的Java气息来了 在此不免吐槽一句 请不要把写互联网程序
  • ESP32-PICO-D4下载程序出现 rst:0x10 (RTCWDT_RTC_RESET),boot:0x13 (SPI_FAST_FLASH_BOOT) flash read err, 1000

    备注 是我自己记录用的 有问题可以交流 用的Visual Studio Code Arduino platformio开发 最近现在在搞物联网 发现ESP32这款芯片容易上手 而且功能强大 买的开发板用起来很顺手 于是我就自己从立创开源上找
  • 解决cannot be cast to class jakarta.servlet.Servlet问题

    我的Tomcat版本是10 0 5 这个问题的主要原因是因为 10版本的Tomcat的servlet包变化了 解决问题方法 IDEA选择这个直接完美解决 IDEA选择这个直接完美解决 IDEA选择这个直接完美解决 1下载对应的包并且导入 下
  • Prim算法解决修路问题

    普里姆算法 Prim算法 图论中的一种算法 可在加权连通图里搜索最小生成树 意即由此算法搜索到的边子集所构成的树中 不但包括了连通图里的所有顶点 英语 Vertex graph theory 且其所有边的权值之和亦为最小 普里姆算法和Kru
  • storm集成kafka简单使用示例2

    StormKafkaTopo java package stormUse stormUse import java util Properties import org apache storm Config import org apac