SparkStreaming与Kafka010之05之01 Consumer

2023-11-11

package Kafka010

import Kafka010.Utils.MyKafkaUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * Created by Shi shuai RollerQing on 2019/12/24 19:47
 *
 * kakfa的API 0-10版本的Consumer测试
 */
//TODO :  kakfa的API 0-10版本的Consumer测试
object Kafka010Demo01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName(s"${this.getClass.getCanonicalName}")
    val ssc = new StreamingContext(conf, Seconds(5))

    val topic = List("topicA") //后面的ConsumerStrategies的参数要求topic为集合的形式 可能不止一个topic
    val kafkaParams = MyKafkaUtils.getKafkaConsumerParams("SparkKafka010")

    val ds: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topic, kafkaParams)
    )

    ds.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        println(rdd.count())
      }
    })
    ssc.start()
    ssc.awaitTermination()

  }
}
   //这个KafkaUtils.createDirectStream要规定kafka的k v的类型 然后三个参数 一个ssc 另外两个位置策略和消费者策略点进去看看
   // PreferConsistent: Use this in most cases, it will consistently distribute partitions across all executors.
   // PreferBrokers: Use this only if your executors are on the same nodes as your Kafka brokers.
   // PreferFixed: Use this to place particular TopicPartitions on particular hosts if your load is uneven.
   // Any TopicPartition not specified in the map will use a consistent location.

// Assign: 消费部分分区

工具类

package Kafka010.Utils

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer

/**
 * Created by Shi shuai RollerQing on 2019/12/24 19:20
 */
object MyKafkaUtils {

  def getKafkaConsumerParams(grouid: String = "SparkStreaming010", autoCommit: String = "true"): Map[String, String] = {
    val kafkaParams = Map[String, String] (
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> autoCommit,
      //ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",//earliest、 none 、latest 具体含义可以点进去看
      ConsumerConfig.GROUP_ID_CONFIG -> grouid,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].getName,
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].getName
    )
    kafkaParams
  }

  /**
   * 这个是官网的写kafka配置的写法,不过还是推荐使用第一种,这样不用自己写参数,避免手误
   *
   * 这个没有经过测试 要是使用也要改下 传参数进来 比如跟上面一样的groupid 要不就使用默认的
   * @return
   */
  def getKafkaConsumerParams2(): Map[String, Object] = {
    val kafkaParams = Map[String, Object] {
      "bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092"
      "key.deserializer" -> classOf[StringDeserializer]
      "value.deserializer" -> classOf[StringDeserializer]
      "auto.offset.reset" -> "latest"
      "group.id" -> "topicA"
      "enable.auto.commit" -> (true: java.lang.Boolean)
    }
    kafkaParams
  }

  def main(args: Array[String]): Unit = {
    println(classOf[StringDeserializer].getName) //org.apache.kafka.common.serialization.StringDeserializer
    println(classOf[StringDeserializer].getClass) //class java.lang.Class
    println(classOf[StringDeserializer]) //class org.apache.kafka.common.serialization.StringDeserializer

  }
}

结果没错 求的就是每5s的批次的数据条数
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SparkStreaming与Kafka010之05之01 Consumer 的相关文章

  • 记一次Spark打包错误:object java.lang.Object in compiler mirror

    使用maven compile和package 一直报错scala reflect internal MissingRequirementError object scala runtime in compiler mirror not f
  • spark-submit 报错 Initial job has not accepted any resources

    spark submit 报这样的错误 WARN scheduler TaskSchedulerImpl Initial job has not accepted any resources check your cluster UI to
  • 大数据技术之Kafka——Kafka入门

    目录 一 概述 1 1 为什么要有Kafka 1 2 定义 1 3 消息队列 1 消息队列的应用场景 2 消息队列的两种模式 1 4 基础架构 二 Producer生产者 2 1 生产者消息发送流程 2 1 1 发送原理 2 2 异步发送A
  • Flink设置Source数据源使用kafka获取数据

    流处理说明 有边界的流bounded stream 批数据 无边界的流unbounded stream 真正的流数据 Source 基于集合 package com pzb source import org apache flink ap
  • Hudi和Kudu的比较

    与Kudu相比 Kudu是一个支持OLTP workload的数据存储系统 而Hudi的设计目标是基于Hadoop兼容的文件系统 如HDFS S3等 重度依赖Spark的数据处理能力来实现增量处理和丰富的查询能力 Hudi支持Increme
  • ELK配置记录(filebeat+kafka+Logstash+Elasticsearch+Kibana)

    一 简介 elk日志平台 日志收集 分析和展示的解决方案 满足用户对 志的查询 排序 统计需求 elk架构 filebeat 采集 kafka Logstash 管道 Elasticsearch 存储 搜索 Kibana 日志应用 各组件功
  • spark groupByKey和groupBy,groupByKey和reduceByKey的区别

    1 groupByKey Vs groupBy 用于对pairRDD按照key进行排序 author starxhong object Test def main args Array String Unit val sparkConf n
  • [Docker]使用Docker部署Kafka

    Kafka 是一个分布式流处理平台 它依赖于 ZooKeeper 作为其协调服务 在 Kafka 集群中 ZooKeeper 负责管理和协调 Kafka 的各个节点 因此 要在 Docker 容器中启动 Kafka 通常需要同时启动一个 Z
  • WebSocket + kafka实时推送数据(springboot纯后台)

    逻辑 kafka订阅消费者主题 消费后通过webSocket推送到前端 kafka vue financial webSocket 学习引用 SpringBoot2 0集成WebSocket 实现后台向前端推送信息 World Of Mos
  • [分布式] zookeeper集群与kafka集群

    目录 一 Zookeeper 概述 1 1 Zookeeper定义 1 2 Zookeeper 工作机制 1 3 Zookeeper 特点 1 4 Zookeeper 数据结构 1 5 Zookeeper 应用场景 1 6 Zookeepe
  • 【硬刚大数据之学习路线篇】2021年从零到大数据专家的学习指南(全面升级版)

    欢迎关注博客主页 https blog csdn net u013411339 本文由 王知无 原创 首发于 CSDN博客 本文首发CSDN论坛 未经过官方和本人允许 严禁转载 欢迎点赞 收藏 留言 欢迎留言交流 声明 本篇博客在我之前发表
  • Spark的常用概念总结

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 文章目录 前言 一 基本概念 1 RDD的生成 2 RDD的存储 3 Dependency 4 Transformation和Action 4 1 Transformatio
  • sparkstreamming 消费kafka(1)

    pom
  • Kafka 监控系统Eagle 使用教程 V1.4.0

    1 下载安装zookeeper 2 下载安装kafka 3 下载安装kafka eagle http download kafka eagle org tar zvxf kafka eagle bin 1 4 0 tar gz 4 配置JA
  • JAVA 安装与简单使用

    JAVA简易安装 下载安装 环境变量 进入变量界面 设置变量 验证JAVA环境 运行Java程序 个人站 ghzzz cn 还在备案 很快就能访问了 下载安装 第一步当然是从官网下载安装java了 网上有很多的教程 这里简单的写一下 在这里
  • python+django基于Spark的国漫画推荐系统 可视化大屏分析

    国漫推荐信息是现如今社会信息交流中一个重要的组成部分 本文将从国漫推荐管理的需求和现状进行分析 使得本系统的设计实现具有可使用的价 做出一个实用性好的国漫推荐系统 使其能满足用户的需求 并可以让用户更方便快捷地国漫推荐 国漫推荐系统的设计开
  • 2023_Spark_实验二十九:Flume配置KafkaSink

    实验目的 掌握Flume采集数据发送到Kafka的方法 实验方法 通过配置Flume的KafkaSink采集数据到Kafka中 实验步骤 一 明确日志采集方式 一般Flume采集日志source有两种方式 1 Exec类型的Source 可
  • Kafka基础—3、Kafka 消费者API

    一 Kafka消费者API 1 消息消费 当我们谈论 Kafka 消费者 API 中的消息消费时 我们指的是消费者如何从 Kafka 主题中拉取消息 并对这些消息进行处理的过程 消费者是 Kafka 中的消息接收端 它从指定的主题中获取消息
  • 消息队列选型:Kafka 如何实现高性能?

    在分布式消息模块中 我将对消息队列中应用最广泛的 Kafka 和 RocketMQ 进行梳理 以便于你在应用中可以更好地进行消息队列选型 另外 这两款消息队列也是面试的高频考点 所以 本文我们就一起来看一下 Kafka 是如何实现高性能的
  • 阿里技术官亲笔力作:Kafka限量笔记,一本书助你掌握Kafka的精髓

    前言 分布式 堪称程序员江湖中的一把利器 无论面试还是职场 皆是不可或缺的技能 而Kafka 这款分布式发布订阅消息队列的璀璨明珠 其魅力之强大 无与伦比 对于Kafka的奥秘 我们仍需继续探索 要论对Kafka的熟悉程度 恐怕阿里的大佬们

随机推荐

  • Linux下多进程通信(signal,pipe)

    操作系统实验导航 实验一 银行家算法 https blog csdn net weixin 46291251 article details 115384510 实验二 多级队列调度和多级反馈队列调度算法 https blog csdn n
  • GpuMat ROI

    在引用GpuMat数据的ROI时 需要保证该数据在Gpu 内存中存储是连续的 使用gpu createContinuous创建连续空间 cuda GpuMat dst pyr laplace tmp dst pyr laplace gpu
  • LL(1)文法构造FIRST、FOLLOW、分析表并分析

    一 实验目的 学生运用编译原理的知识在实验技能和方法自行设计实验方案并加以实现 二 使用仪器 器材 计算机一台 操作系统 Windows10 编程软件 Intellij IDEA 三 实验内容及原理 1 实验内容 输入任意一个正确的文法G
  • Windows音量变化通知 - 系统音量监控

    Windows音量变化通知 系统音量监控 Endpoint Volume Controls 1 实现IAudioEndpointVolumeCallback接口 2 主函数 总结 参考 Endpoint Volume Controls 本次
  • 婚姻好不好,嫁给谁很重要

    都说幸福的婚姻是相似的 不幸的婚姻各有各的不幸 事实上 那些不幸的婚姻 追根究底不过都是找错了人 婚姻好不好 关键就在于嫁给谁 因为 值得相信的从来不是感情 而是人 人若靠谱 婚姻便可靠 人若靠不住 婚姻迟早生变 这世上 有的夫妻恩恩爱爱
  • 一次发生在JVM新生代和老年代的GC过程简述

    首先 我们假设程序当前的堆空间的情况如下 然后 程序在运行过程中 开始了我们的第一次YoungGC 年轻代GC 得到如下的图 通过这次的GC 我们的2 3 4对象都被回收了 只有1对象得到了保留 进入了S1 幸存者区 然后我们的程序在运行的
  • Java 通过Soap方式调用WebService接口

    import org apache commons lang3 StringEscapeUtils import org apache http HttpEntity import org apache http client config
  • 短视频seo抖音矩阵源码开发搭建技术解析

    一 短视频seo抖音矩阵源码开发需要考虑以下几个方面 技术选型 选择合适的开发语言 框架和数据库 常用的开发语言有Java PHP等 常用的框架有Spring Django等 常用的数据库有MySQL MongoDB等 服务器的选择 根据应
  • 如何在 NodeJs 中上传、处理和存储文件:分步手册

    存储文件有三种基本方法 1 直接将其存储在数据库中 2 将其存储在文件系统中并将路径保存到数据库 3 将其存储在某些云存储中 例如 Amazon S3 Google Cloud Storage 或 Microsoft Azure Blob
  • 去除自定义AlertDialog黑边

    http blog csdn net mwj 88 article details 45482421 1 现象描述 html view plain copy View view LayoutInflater from getActivity
  • java学习笔记——day1

    java笔记 字面量 变量 数据类型 命名规则 类型转换 运算符operator API 程序的流程控制 数组 字面量 变量 字面量 计算机用来处理数据的 字面量就是告诉程序员 数据在程序中的书写格式 字符 单引号 一个字符 字符串 双引号
  • python+selenium自动化软件测试(第3章):unittes

    3 1 unittest简介 前言 python基础比较弱的 建议大家多花点时间把基础语法学好 这里有套视频 可以照着练习下 http pan baidu com s 1i44jZdb 密码 92fs 熟悉java的应该都清楚常见的单元测试
  • 分层测试(一):什么是分层测试?

    什么是分层测试 分层测试是通过对质量问题分类 分层来保证整体系统质量的测试体系 模块内通过接口测试保证模块质量 多模块之间通过集成测试保证通信路径和模块间交互质量 整体系统通过端到端用例对核心业务场景进行验证 用户体验通过手工测试确保无妨碍
  • Unity开发(2)建片草地

    文章目录 1 简述 2 创建 2 1 创建项目 2 2 进入开发窗体 3 建个地面 3 1 新建地面 3 2 调整地面大小 3 3 添加草地 3 3 1 初识Unity图片资源 3 3 2 添加图片资源 3 3 3 修改图片在场景中大小 1
  • C语言入门知识1(零基础新手适用)

    C语言入门知识1 零基础新手适用 程序语言 1 机器语言 机器语言是低级语言 是用01码来编写的二进制代码语言 2 汇编语言 汇编语言也是低级语言 是用英文字母和符号串编写的 3 高级语言 由于汇编语言依赖于硬件体系且符合较多 为了方便高级
  • Go中 defer的使用

    文章目录 简介 示例 使用场景 捕获异常 文件操作 简介 defer 是 Golang 中的一个非常有用的关键字 它用于注册延迟调用 也就是一个函数的执行被延迟到调用它的函数返回之后 常用于资源清理 异常处理等场景 示例 defer 是注册
  • python实现电子邮件编程

    一 几个专业名词 MUA MTA MDA 假设我们自己的电子邮件地址是me 163 com 对方的电子邮件地址是friend sina com 注意地址都是虚构的哈 现在我们用Outlook或者Foxmail之类的软件写好邮件 填上对方的E
  • C++提高8: 类模板成员函数类外实现和类模板分文件编写

    1 类模板成员函数类外实现 类外实现主要有三个关键点 作用域 识别T的数据类型 告诉编译器这是一个类模板 剩下的 就还是基础的类内声明类外定义实现了 直接上代码观察一下 include
  • redis后台实现投票功能

    原创文章 转载请注明出处https blog csdn net qq 41969845 article details 108406059 一 前言 本文以投票功能为例 从实际例子中熟练掌握redis的应用 阅读本文需要有一定的Java基础
  • SparkStreaming与Kafka010之05之01 Consumer

    package Kafka010 import Kafka010 Utils MyKafkaUtils import org apache kafka clients consumer ConsumerRecord import org a