sparkStreaming对接kafka

2023-11-15

ReceiverAPI:需要一个专门的Executor去接收数据,然后发送给其他的Executor做计算。存在的问题,接收数据的Executor和计算的Executor速度会有所不同,特别在接收数据的Executor速度大于计算的Executor速度,会导致计算数据的节点内存溢出。

DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。

Kafka 0-8 Receive模式

1) 需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。
2)导入依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.1.1</version>
</dependency>

2) 编写代码
0-8Receive模式,offset维护在zk中,程序停止后,继续生产数据,再次启动程序,仍然可以继续消费。可通过get /consumers/bigdata/offsets/主题名/分区号 查看

object Spark04_ReceiverAPI {
  def main(args: Array[String]): Unit = {

    //1.创建SparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("Spark04_ReceiverAPI").setMaster("local[*]")

    //2.创建StreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    //3.使用ReceiverAPI读取Kafka数据创建DStream
    //在定义Kafka参数的时候,应注意添加的是zk集群信息还是kafka集群信息
    val kafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,
      "hadoop202:2181,hadoop203:2181,hadoop204:2181",
      "bigdata",
      //v表示的主题的分区数
      Map("mybak" -> 2))

    //4.计算WordCount并打印        new KafkaProducer[String,String]().send(new ProducerRecord[]())
    val lineDStream: DStream[String] = kafkaDStream.map(_._2)
    val word: DStream[String] = lineDStream.flatMap(_.split(" "))
    val wordToOneDStream: DStream[(String, Int)] = word.map((_, 1))
    val wordToCountDStream: DStream[(String, Int)] = wordToOneDStream.reduceByKey(_ + _)
    wordToCountDStream.print()

    //5.开启任务
    ssc.start()
    ssc.awaitTermination()
  }
}

打开kafka集群,输入

[wt@hadoop02 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop02:9092 --topic mybak

Kafka 0-8 Direct模式

1)需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。
2)导入依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.1.1</version>
</dependency>

3) 编写代码(自动维护offset1)
offset维护在checkpoint中,但是获取StreamingContext的方式需要改变,目前这种方式会丢失消息

object Spark05_DirectAPI_Auto01 {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("Spark05_DirectAPI_Auto01").setMaster("local[*]")

    //2.创建StreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp")

    //3.准备Kafka参数
    val kafkaParams: Map[String, String] = Map[String, String](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "bigdata"
    )

    //4.使用DirectAPI自动维护offset的方式读取Kafka数据创建DStream
    val kafkaDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,
      kafkaParams,
      Set("mybak"))

    //5.计算WordCount并打印
    kafkaDStream.map(_._2)
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print()

    //6.开启任务
    ssc.start()
    ssc.awaitTermination()
  }
}

4) 编写代码(自动维护offset2)
offset维护在checkpoint中,获取StreamingContext为getActiveOrCreate
这种方式缺点:
 checkpoint小文件过多
 checkpoint记录最后一次时间戳,再次启动的时候会把间隔时间的周期再执行一次


object Spark06_DirectAPI_Auto02 {

  def main(args: Array[String]): Unit = {
    val ssc: StreamingContext = StreamingContext.getActiveOrCreate("D:\\dev\\workspace\\my-bak\\spark-bak\\cp", () => getStreamingContext)

    ssc.start()
    ssc.awaitTermination()
  }

  def getStreamingContext: StreamingContext = {
    //1.创建SparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("DirectAPI_Auto01").setMaster("local[*]")
    
    //2.创建StreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp")
    
    //3.准备Kafka参数
    val kafkaParams: Map[String, String] = Map[String, String](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "bigdata"
    )
    
    //4.使用DirectAPI自动维护offset的方式读取Kafka数据创建DStream
    val kafkaDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,
      kafkaParams,
      Set("mybak"))
    
    //5.计算WordCount并打印
    kafkaDStream.map(_._2)
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print()
    
    //6.返回结果
    ssc
  }
}

5) 编写代码(手动维护offset)

object Spark07_DirectAPI_Handler {

def main(args: Array[String]): Unit = {

//1.创建SparkConf
val conf: SparkConf = new SparkConf().setAppName("DirectAPI_Handler").setMaster("local[*]")

//2.创建StreamingContext
val ssc = new StreamingContext(conf, Seconds(3))

//3.创建Kafka参数
val kafkaParams: Map[String, String] = Map[String, String](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092",
  ConsumerConfig.GROUP_ID_CONFIG -> "bigdata"
)

//4.获取上一次消费的位置信息
val fromOffsets: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long](
  TopicAndPartition("mybak", 0) -> 13L,
  TopicAndPartition("mybak", 1) -> 10L
)

//5.使用DirectAPI手动维护offset的方式消费数据
val kafakDStream: InputDStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
  ssc,
  kafkaParams,
  fromOffsets,
  (m: MessageAndMetadata[String, String]) => m.message())

//6.定义空集合用于存放数据的offset
var offsetRanges = Array.empty[OffsetRange]

//7.将当前消费到的offset进行保存
kafakDStream.transform { rdd =>
  offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd
}.foreachRDD { rdd =>
  for (o <- offsetRanges) {
    println(s"${o.fromOffset}-${o.untilOffset}")
  }
}

//8.开启任务
ssc.start()
ssc.awaitTermination()

}
}


## Kafka 0-10 Direct模式

1)需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。
2)导入依赖,为了避免和0-8冲突,我们新建一个module演示

org.apache.spark spark-core_2.11 2.1.1 org.apache.spark spark-streaming_2.11 2.1.1 org.apache.spark spark-streaming-kafka-0-10_2.11 2.1.1 ```

3)编写代码

object Spark01_DirectAPI010 {
  def main(args: Array[String]): Unit = {

    //1.创建SparkConf
    val conf: SparkConf = new SparkConf().setAppName("DirectAPI010").setMaster("local[*]")

    //2.创建StreamingContext
    val ssc = new StreamingContext(conf, Seconds(3))

    //3.构建Kafka参数
    val kafkaParmas: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "bigdata191122",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    )

    //4.消费Kafka数据创建流
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Set("test"), kafkaParmas))

    //5.计算WordCount并打印
    kafkaDStream.map(_.value())
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print()

    //6.启动任务
    ssc.start()
    ssc.awaitTermination()

  }

}

消费Kafka数据模式总结

 0-8 ReceiverAPI:

  1. 专门的Executor读取数据,速度不统一
  2. 跨机器传输数据,WAL
  3. Executor读取数据通过多个线程的方式,想要增加并行度,则需要多个流union
  4. offset存储在Zookeeper中
     0-8 DirectAPI:
  5. Executor读取数据并计算
  6. 增加Executor个数来增加消费的并行度
  7. offset存储
    a) CheckPoint(getActiveOrCreate方式创建StreamingContext)
    b) 手动维护(有事务的存储系统)
    c) 获取offset必须在第一个调用的算子中:offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
     0-10 DirectAPI:
  8. Executor读取数据并计算
  9. 增加Executor个数来增加消费的并行度
  10. offset存储
    i. a.__consumer_offsets系统主题中
    ii. b.手动维护(有事务的存储系统)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

sparkStreaming对接kafka 的相关文章

  • Spark(七)——累加器和广播变量

    5 累加器 通过在驱动器中调用SparkContext accumulator initialValue 方法 创建出存有初始值的累加器 返回值为org apache spark Accumulator T 对象 其中 T 是初始值 ini
  • Spark性能调优之Shuffle调优

    Spark性能调优之Shuffle调优 Spark底层shuffle的传输方式是使用netty传输 netty在进行网络传输的过程会申请堆外内存 netty是零拷贝 所以使用了堆外内存 shuffle过程中常出现的问题 常见问题一 redu
  • spark-shell 加载本地文件报错 java.io.FileNotFoundException

    学习spark shell 时候发现一个问题 从本地文件加载数据生成RDD 报错 文件找不到 原因 spark shell 如果启动了集群模式 真正负责计算的executor会在 该executor所在的 worker节点上读取文件 并不是
  • 大数据面试题Spark篇(1)

    目录 1 spark数据倾斜 2 Spark为什么比mapreduce快 3 hadoop和spark使用场景 4 spark宕机怎么迅速恢复 5 RDD持久化原理 6 checkpoint检查点机制 7 checkpoint和持久化的区别
  • pyspark 连接远程hive集群配置

    今天本地spark连接远程hive集群 直接把配置导入进去 本地直接应用远程环境 1 安装spark 设置spark环境变量 2 拿到远程集群配置文件 将配置文件放在spark conf 目录下 xml 一共五个文件 3 将mysql co
  • cdh下spark2-yarn运行sparkstreaming获取kafka数据使用spark-streaming-kafka-0-10_2.11报错解决

    报错问题 20 07 15 17 20 51 INFO utils AppInfoParser Kafka version 0 9 0 kafka 2 0 0 20 07 15 17 20 51 INFO utils AppInfoPars
  • scala和spark的下载与安装

    简易安装scala和spark 一 安装scala 1 安装scala scala下载注意和jdk的版本号 下载地址 https www scala lang org download 2 上传到linux虚拟机里 可通过rz方式上传 上传
  • Compressed Sparse Column format(CSC)

    CSR Compressed Sparse Row format 和CSC Compressed Spare Column format 都是一种稀疏矩阵的存储格式 这里分别给出实例 假设有如下矩阵 1360
  • Kafka/Spark消费topic到写出到topic

    1 Kafka的工具类 1 1 从kafka消费数据的方法 消费者代码 def getKafkaDStream ssc StreamingContext topic String groupId String consumerConfigs
  • Hadoop完全分布式集群——Hadoop 配置

    前面已完成VMware虚拟机安装与配置 参考前一篇Hadoop完全分布式集群 VMware虚拟机安装与配置 夏雨和阳阳的博客 CSDN博客 下面将进行Hadoop 配置 一 slave1 slave2节点配置修改 slave1 slave2
  • spark报Got an error when resolving hostNames. Falling back to /default-rack for all

    一 报错代码如下 21 06 01 20 13 36 INFO yarn SparkRackResolver Got an error when resolving hostNames Falling back to default rac
  • Spark Job写文件个数的控制以及小文件合并的一个优化

    文章目录 背景说明 通过引入额外Shuffle对写入数据进行合并 EnsureRepartitionForWriting Rule CoalesceShufflePartitions Rule OptimizeShuffleWithLoca
  • Impala presto hbase hive sparksql

    Impala 技术点梳理 http www cnblogs com TiestoRay p 10243365 html Impala 优点 实时性查询 计算的中间结果不写入磁盘 缺点 对于内存的依赖过于严重 内存溢出直接导致技术任务的失败
  • spark_hadoop集群搭建自动化脚本

    bin bash 脚本使用说明 1 使用脚本前需要弄好服务器的基础环境 2 在hadoop的每个节点需要手动创建如下目录 data hdfs tmp 3 修改下面的配置参数 4 脚本执行完备后需要收到格式化namenode
  • 使用Flink1.16.0的SQLGateway迁移Hive SQL任务

    使用Flink的SQL Gateway迁移Hive SQL任务 前言 我们有数万个离线任务 主要还是默认的DataPhin调度CDP集群的Hive On Tez这种低成本任务 当然也有PySpark 打Jar包的Spark和打Jar包的Fl
  • 2020-10-24 大数据面试问题

    上周面试数据开发职位主要从公司的视角讲一下记录下面试流水 1 三面技术一轮hr 面到了cto 整体来看是这一周技术含量最高信息量最大的一个 1到4轮过了4个小时 技术上的问题主要问的对数据分层的理解 1 一面自我介绍 目前团队的规模多大 2
  • Spark常用参数解释

    Spark的默认配置文件位于堡垒机上的这个位置 SPARK CONF DIR spark defaults conf 用户可以自行查看和理解 需要注意的是 默认值优先级最低 用户如果提交任务时或者代码里明确指定配置 则以用户配置为先 用户再
  • JAVA 安装与简单使用

    JAVA简易安装 下载安装 环境变量 进入变量界面 设置变量 验证JAVA环境 运行Java程序 个人站 ghzzz cn 还在备案 很快就能访问了 下载安装 第一步当然是从官网下载安装java了 网上有很多的教程 这里简单的写一下 在这里
  • 2023_Spark_实验二十九:Flume配置KafkaSink

    实验目的 掌握Flume采集数据发送到Kafka的方法 实验方法 通过配置Flume的KafkaSink采集数据到Kafka中 实验步骤 一 明确日志采集方式 一般Flume采集日志source有两种方式 1 Exec类型的Source 可
  • spark相关

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 文章目录 前言 一 pandas是什么 二 使用步骤 1 引入库 2 读入数据 总结 前言 提示 这里可以添加本文要记录的大概内容 例如 随着人工智能的不断发展 机器学习这门

随机推荐

  • win7可关闭服务

    Adaptive brightness 如果你没有使用触摸屏一类的智能调节屏幕亮度的设备 该功能就可以放心禁用 ApplicationLayer Gateway Service 为Internet连接共享提供第三方协议插件的支持 Appli
  • 数码管时钟显示按键控制 fpga实现

    目录 原理 实现 原理 数码管原理 并且该数码管是共阳极 所以段选低电平有效 片选低电平有效 段选共用8个引脚 实现 思路 分三个模块实现 1 按键消抖 按键消抖 2 计数器模块 产生需要的时钟信号数据 3 数码管驱动 将输入的数据转换为段
  • BIG Endian 和 Little Endian(small endian)模式的区别

    BIG Endian 和 Little Endian small endian 模式的区别 谈到字节序的问题 必然牵涉到两大CPU派系 那就是Motorola的PowerPC系列CPU和Intel的x86系列CPU PowerPC系列采用b
  • QT 打开指定目录并选中指定文件

    目录 方法一 使用Qt自带的方法 方法二 使用windows自带工具 有时自动生成文件之后 点击某个按钮我们希望能够自动跳转到文件所在目录 打开之后不依附于运行程序 可能还需要选中该文件 环境 win10 Qt5 9 6 MinGW 方法一
  • 江苏大学计算机学院林琳,计算机学院教师岗副高及以下、其它专技中级及以下人员岗位聘用结果公示...

    副教授五级 6人 按姓名拼音排序 下同 毕建良 李峰 林庆 毛启容 王良民 朱利平 副教授六级 14人 陈伟鹤 韩飞 李莉 李星毅 潘雨青 钱少先 王洪金 熊书明 杨利霞 曾兰玲 赵念强 周从华 朱小龙 邹志文 副教授七级 13人 蔡涛 陈
  • Docker启动一个Centos镜像

    搜索可用的centos的docker镜像 docker search
  • 第三届国际金融科技论坛开幕,神州信息专家参与蓉城“论道”

    10月30日至31日 由西南财经大学 加州大学伯利克分校国际风险数据分析联盟 成都市地方金融监督管理局联合主办的 第三届国际金融科技论坛 SWUFE CDAR 2020 在成都举行 神州信息金融战略本部副总裁潘志江 神州信息金融科技首席风控
  • google 图片下载

    def xia url headers headers user agent Mozilla 5 0 Windows NT 10 0 WOW64 AppleWebKit 537 36 KHTML like Gecko Chrome 78 0
  • Cadence 17.4 使用TIPS: Orcad 输出PDF

    首先File gt Export gt PDF PDF Export 设置页面 其中有4个输出工具供选择 此处我选择第一个Acrobat Distiller 这个是电脑里安装了咱们常用的Adobe Acrobat DC 就会自带的程序 如果
  • 线性分组码最小汉明距离_信息与编码系列(六)线性码~线性代数

    目录 序 线性码的矩阵描述 线性码的等价性 线性码的最小距离 标准数组 Standard Array 校验子解码 Syndrome Decoding 序 这篇文章相当于做一篇 索引 将线性代数的东西和线性码对应起来 方便日后出现问题能够快速
  • jsp调用服务器上的其他程序(C程序)

    String area dz String req getParameter area String id dz String req getParameter id String ip 10 xxx x xx String encodeS
  • SAM-DETR学习笔记Accelerating DETR Convergence via Semantic-Aligned Matching

    Abstract 最近开发的DEtection TRansformer DETR 通过消除一系列手工制作的组件 建立了一个新的对象检测范式 然而 DETR的收敛速度非常慢 这大大增加了培训成本 我们观察到 慢收敛主要归因于在不同特征嵌入空间
  • dropout层

    深度神经网 DNN 中经常会存在一个常见的问题 模型只学会在训练集上分类 过拟合现象 dropout就是为了减少过拟合而研究出的一种方法 一 简介 当训练模型较大 而训练数据很少的话 很容易引起过拟合 一般情况我们会想到用正则化 或者减小网
  • EIGamal数字签名的实现(c++)——大三密码学实验

    实验原理 1 密钥产生 Alice要对一个消息签名 她选择一个大素数p和一个本原根g 选择一个秘密整数 并且计算 p g y 公开 x秘密保存 注 EIGamal签名方案的安全性在于x的保密性 由于离散对数学问题难解 很难由 p g y 确
  • 电脑上显示打印机无法连接服务器错误代码,电脑怎么连接打印机显示错误代码的解决办法...

    下面来看看小编为您整理的电脑怎么连接打印机显示错误代码的答案 电脑怎么连接打印机显示错误代码内容导航1 连接不上打印机错误0x00000709 打印机出现0x00000709错误代码可能是因为网络或者打印设置错误 具体解决步骤如下 1 首先
  • 关于APP接口设计

    最近一段时间一直在做APP接口 总结一下APP接口开发过程中的注意事项 1 效率 接口访问速度 APP有别于WEB服务 对服务器端要求是比较严格的 在移动端有限的带宽条件下 要求接口响应速度要快 所有在开发过程中尽量选择效率高的框架 PHP
  • golang获取当前时间,前n天时间,以及时间格式的转化

    获取当前时间 currentTime time Now currentTime 的结果为go的时间time类型 2018 09 27 13 24 58 287714118 0000 UTC 获取前n天的时间 获取两天前的时间 current
  • idea中jar包依赖了但还是找不到类的解决方案

    新项目check到本地 导入到idea中后 编译的时候很多类都报错了 打开发现有些框架中的类找不到 现象为 控制台报错 点击这个包 明明发现是有这个依赖的 说明项目是依赖了这个jar包的 打开项目配置 查看依赖树 问题找到 idea这里将这
  • 机器学习实验基础

    文章目录 一 机器学习是什么 二 实验方法和原则 1 评价指标 1回归任务 2分类任务 3特定任务 2 数据集 3 实验验证 随机重复实验 K fold 交叉实验 三 总结 课程链接 学堂在线 张敏老师机器学习算法训练营 一 机器学习是什么
  • sparkStreaming对接kafka

    ReceiverAPI 需要一个专门的Executor去接收数据 然后发送给其他的Executor做计算 存在的问题 接收数据的Executor和计算的Executor速度会有所不同 特别在接收数据的Executor速度大于计算的Execu