SparkStreaming结合Kafka使用

2023-05-16

spark自带的example中就有streaming结合kafka使用的案例:

$SPARK_HOME/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala

使用方法参见代码描述:


Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>
<zkQuorum> is a list of one or more zookeeper servers that make quorum
<group> is the name of kafka consumer group
<topics> is a list of one or more kafka topics to consume from
<numThreads> is the number of threads the kafka consumer should use

Example:
`$ bin/run-example \
org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \
my-consumer-group topic1,topic2 1`  

 运行步骤:

1、启动ZK


zkServer.sh start  

2、启动KAFKA SERVER


kafka-server-start.sh  $KAFKA_HOME/config/server.properties &    

3、运行Producer


run-example org.apache.spark.examples.streaming.KafkaWordCountProducer hadoop000:9092 test 3 5  

参数描述:

  hadoop000:9092表示producer的地址和端口;

  test表示topic;

  3表示每秒发多少条消息;

  5表示每条消息中有几个单词;

4、运行Consumer


run-example org.apache.spark.examples.streaming.KafkaWordCount hadoop000:2181 test-consumer-group test 1  

参数描述:

  hadoop000:2181表示zookeeper的监听地址;

  test-consumer-group表示consumer-group的名称,必须和$KAFKA_HOME/config/consumer.properties中的group.id的配置内容一致;

  test表示topic;

  1表示线程数;

注意观察consumer控制台的数据输出,类似于下面的输出:


-------------------------------------------
Time: 1410483028000 ms
-------------------------------------------
(4,467)
(8,463)
(6,467)
(0,475)
(2,401)
(7,464)
(5,447)
(9,419)
(3,430)
(1,452)  

 

 

注意:

1、运行该案例的时候不需要启动spark;

2、我已经将$KAFKA_HOME/bin和$SPARK_HOME/bin添加到系统环境变量中,故在任意路径均可以执行运行步骤的脚本,如果没配置到环境变量,需要指定路径再执行脚本。

 


参考许鹏博客

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SparkStreaming结合Kafka使用 的相关文章

随机推荐

  • Linux查看用户数、登录用户

    如果是系统中全部只要默认shell是bash的就包括那么二楼正解 xff0c 就是cat etc passwd grep bash wc l 如果是正在登陆系统的账户中使用bash shell的 xff0c 那么ps ef grep bas
  • ArchLinux安装常用软件

    安装完Arch操作系统后的一些常用软件安装 首先同步下载源 xiaomo 64 Arch XIAOMO sudo pacman Sy 安装vim xiaomo 64 Arch XIAOMO echo e 34 n 34 sudo pacma
  • cmd美化

    原本的cmd 虽然原本的cmd很简约黑底白字 xff0c 但是看久了也会视觉疲劳 美化 xff08 丑化 xff09 打开cmd右键头部选择属性 字体选项这里可以修改字体的大小和选择字体 xff0c 修改之后下方会有预览 xff0c 颜色选
  • Got timeout reading communication packets解决方法

    Got timeout reading communication packets解决方法 http www th7 cn db mysql 201702 225243 shtml Note Aborted connection xxxx
  • java在数字前面自动补零的方法

    将元数据前补零 xff0c 补后的总长度为指定的长度 xff0c 以字符串的形式返回 64 param sourceDate 64 param formatLength 64 return 重组后的数据 public static Stri
  • MariaDB命令详解

    MariaDB命令详解 mysql客户端程序 xff1a 命令行交互式客户端程序 xff1a mysql mysql mysql OPTIONS database mysql help 配置文件的读取次序 xff1a etc mysql m
  • python文本文件操作诗句给上一句输出下一句_[Python] 自动化办公 定制微信每日一句诗...

    转载请注明 xff1a 陈熹 chenx6542 64 foxmail com 简书号 xff1a 半为花间酒 若公众号内转载请联系公众号 xff1a 早起Python 这篇文章能学到的主要内容 xff1a 利用 喵提醒 推送消息至微信 x
  • 我的年终总结,作为研发,在2018年都有哪些进步、收获与成长?

    2018 结束了 部门开会总结了过去的工作与未来的展望 xff0c 也是个不错的机会去回顾 审视 思考自己的 2018 年 玄难说过人与人的差距来自于思考与总结 xff0c 我深深地认同这一点 我也把自己的一部分思考写下来 xff0c 在公
  • Arch无法更新和安装软件

    今天用户yay明来安装htop时 xff0c 一直卡死在以下输出内容出 xff1a db lck is present Waiting 更新软件源也出现以下故障 xff1a sudo pacman Syy sudo ivan 的密码 xff
  • 云主机的硬盘IO性能比较

    测试方式 因为工作等需要 xff0c 手里有一堆云主机 xff0c 前几天忽然想到来测试对比一下各家的IO性能如何 测试方法不严谨 xff0c 仅供参考 测试工具为fio xff0c 测试命令如下 xff08 以sync方式为例 xff09
  • 定制小狼豪(五笔+拼音)输入法

    小狼毫输入法是一个给程序员折腾的输入法 xff0c 可以自由定制 rime是一个输入法框架 xff0c 小狼毫是在windows平台上的名称 相关教程和下载 xff1a https jianguoyun com p DRylhFMQv 3j
  • 10.12 firewalld和netfilter

    2019独角兽企业重金招聘Python工程师标准 gt gt gt Linux防火墙 netfilter selinux临时关闭 setenforce 0selinux永久关闭 vi etc selinux configcentos7之前使
  • 使用 build-simple-cdd 快速定制 Debian 安装盘

    为什么80 的码农都做不了架构师 xff1f gt gt gt 官方推荐了 build simple cdd 来 定制Debian安装盘 sudo apt get y install simple cdd xorriso 创建基础目录和文件
  • PostSharp-5.0.26安装包_KeyGen发布_支持VS2017

    PostSharp 5 0 26安装包 KeyGen发布 支持VS2017 请低调使用 PostSharp安装及注册步骤截图 rar 请把浏览器主页设置为以下地址支持本人 https www duba com un 454974 16968
  • centos7 Firewall防火墙开启80端口

    为什么80 的码农都做不了架构师 xff1f gt gt gt centos7 默认是FirewallD 提供支持网络 防火墙区域 zone 定义网络链接以及接口安全等级的动态防火墙管理工具 xff0c 利用FirewallD开启80端口操
  • 安卓6.0系统权限问题android.permission.WRITE_SETTINGS

    关于 Android permission WRITE SETTINGS 的权限 xff0c 申请 xff0c 判断 精简代码如下 xff1a if Build VERSION SDK INT gt 61 Build VERSION COD
  • js match函数注意

    match函数 String prototype match 参数 regexp 返回 返回包含所有匹配的数组 xff0c 如果匹配失败返回Null 数组第一项是整段字符串的匹配 xff0c 第二项至以后都是捕获匹配 注意 需要注意的是 x
  • VR发展简史

    最初的起源 事实上 xff0c 虚拟现实由来已久 xff0c 其概念最早被提及应该追溯到Aldous Huxley xff08 阿道司 赫胥黎 xff09 1932年推出的长篇小说 美丽新世界 xff0c 这篇小说以26世纪为背景 xff0
  • crontab 每月执行一次怎么写? - Linux系统管理 - ChinaUnix.net -

    crontab 每月执行一次怎么写 xff1f Linux系统管理 ChinaUnix net 0 19 1 bin sh xxx sh 每个月的1号的19点钟运行xxx sh 分钟 小时 日子可以更改 xff0c 后两项为 就是month
  • SparkStreaming结合Kafka使用

    spark自带的example中就有streaming结合kafka使用的案例 xff1a SPARK HOME examples src main scala org apache spark examples streaming Kaf