kafka 使用python消费consumer

2023-11-13

参考

python kafka 使用
大数据:kafka常见问题
kafka
python之操作kafka

Kafka基本了解

使用python读取consumer中的数据

安装kafka-python

pip install kafka-python

简单使用

import kafka import KafkaConsumer
#消费kafka中最新的数据 并且自动提交offsets[消息的偏移量]
consumer = KafkaConsumer('my-topic',group_id='my-group',bootstrap_servers=['localhost:9092'])
for message in consumer:
    #注意: message ,value都是原始的字节数据,需要decode
    #例如: message.value.decode('utf-8')
    #完成对每条数据中的操作
    print ("%s:%d:%d: key=%s value=%s" %s (message.topic, message.partition,
                                               message.offset, message.key,
                                               message.value))

其他使用

topic="****"
groupid="****"
brokerlist="*:9092,*:9092"

#读取目前可读最早的消息
consumer = KafkaConsumer(topic, auto_offset_reset='earliest', bootstrap_servers=brokerlist)
#获取topic主题的分区信息
consumer.partitions_for_topic(topic)  
#获取主题列表
print consumer.topics()  
#获取当前消费者订阅的主题
print consumer.subscription()  
#获取当前消费者topic、分区信息
print consumer.assignment()  
#获取当前消费者可消费的偏移量
print consumer.beginning_offsets(consumer.assignment()) 
#重置偏移量,从第5个偏移量消费
consumer.seek(TopicPartition(topic=topic, partition=0), 5)
#获取当前主题的最新偏移量
print consumer.position(TopicPartition(topic=u'test', partition=0)) 

#消费多个主题
consumer = KafkaConsumer(bootstrap_servers=brokerlist)
consumer.subscribe(topics=('test','test0'))  #订阅要消费的主题
for message in consumer:
。。。。

#手动拉取消息
while True:
    msg = consumer.poll(timeout_ms=5)   #从kafka获取消息
    print msg
    time.sleep(1)

#消息挂起与恢复
consumer.pause(TopicPartition(topic=u'test', partition=0))
print consumer.paused()   #获取当前挂起的消费者
#处理操作
consumer.resume(TopicPartition(topic=u'test', partition=0))
#pause执行后,consumer不能读取,直到调用resume后恢复。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

kafka 使用python消费consumer 的相关文章

随机推荐

  • 【ARM】Linux内核驱动之中断

    作者主页 凉开水白菜 作者简介 共同学习 互相监督 热于分享 多加讨论 一起进步 专栏资料 https gitee com stylle linux code 点赞 收藏 再看 养成习惯 订阅的粉丝可通过PC端文末加我微信 可对文章的内容进
  • Microsoft.Web.WebView2 初体验

    上篇已经介绍了WebView2的背景 今天有时间尝试一下 文档地址 https docs microsoft com zh cn dotnet api microsoft web webview2 winforms webview2 exe
  • Django基于用户画像的电影推荐系统源码(项目源代码)

    一 项目介绍 公众号 yk 坤帝 获取全部源代码 本系统是以Django作为基础框架 采用MTV模式 数据库使用MongoDB MySQL和Redis 以从豆瓣平台爬取的电影数据作为基础数据源 主要基于用户的基本信息和使用操作记录等行为信息
  • 微信分享引导页效果

    span style font size 18px span
  • java的rmi

    写在前面 本文看下如何通过Java原生提供的rmi功能来调用远端JVM对象的方法 并获取其结果 1 定义远端service pojo 使用了lombok Getter Setter ToString public class User im
  • 在较新版pycharm中使用conda虚拟环境的两种方法-保姆级教程

    文章目录 方法一 配置解释器 方法二 命令行直接调用 注意事项 方法一 配置解释器 首先创建new project 之后等待配置索引等文件即可 方法二 命令行直接调用 在终端terminal中直接使用conda activate xxx 但
  • 计算机网络工程毕业设计题目选题大全

    文章目录 0 简介 1 如何选题 2 最新网络工程选题 2 1 Java web SSM 系统 2 2 大数据方向 2 3 人工智能方向 2 4 其他方向 4 最后 0 简介 学长搜集分享最新的网络工程专业毕设毕设选题 难度适中 适合作为毕
  • js复制一个对象的方法,不改变原对象

    复制一个对象 不改变原对象简单方法如下 var obj a 1 b 2 es6新方法 Object assign var newObj Object assign obj es6新方法 扩展运算符 var newObj obj 有个弊端 就
  • 什么是网络编程?

    目录 一 UDP DatagramSocket DatagramPacket 服务器 客户端 二 TCP ServerSocket Socke 服务器 客户端 网络编程指的就是网络上的主机通过不同的进程 以编程的方式实现网络信息传输 而提到
  • CSDN如何调节成黑色主题,手把手教学

    今天师弟来问我 有没有把浏览器调节成黑色主题的东东 每天看网页很久 白色太刺眼了 作为未来的新晋程序员 也显得不专业 想到自己有个插件 就分享了出来 获得好评 有需求的地方就有分享 首先上效果图 随便打开一篇文章看一看 教程 首先 要用谷歌
  • 5G技术详解系列-PDU会话签约数据(6)

    相关文章会在公众号同步更新 公众号 5G通信大家学 持续更新的相关5G内容都是直接根据3GPP整理 保证更新内容的准确性 避免通过二手 甚至多手的资料 以讹传讹误导网友 在介绍完流程详解后 会整理专题内容 比如切片 服务发现 QoS流端到端
  • 我的世界java版怎么加整合包_我的世界Minecraft Mod(模组)安装指南

    前言 Mod的安装方法主要分为核心Jar文件手动覆盖安装和使用Forge加载 现在大多数的Mod基本都是依赖于Forge来加载Mod 不过对于刚接触我的世界的玩家来说 在安装Mod的时候也是一头雾水 导致安装Mod后 出现诸如游戏崩溃 黑屏
  • 【从零开始的Java开发】1-6-2 泛型:概述、泛型作为方法参数、自定义泛型、自定义泛型方法

    文章目录 泛型概述 泛型作为方法参数 自定义泛型 一个参数 两个参数 自定义泛型方法 总结 泛型概述 为什么要有泛型 在Java增加泛型之前 泛型程序设计使用继承来实现 坏处 需要强制转换 可向集合中添加任意类型的对象 存在风险 泛型的使用
  • 主干光缆线路的组网结构

    主干光缆是指连接主干光交与业务汇聚节点 以及主干光交之间的光缆 业务汇聚点指安装了OLT设备的节点 主干光交内的业务端口与业务汇聚点ODF间的光纤链路部分或全部是直连的 中间没有活动连接 主干光缆线路的组网结构一般分为 环形 树形和星形 1
  • 嵌入式Linux设备读取CPU温度的方法

    1 ARM 平台下 cat sys devices virtual thermal thermal zone0 temp62374 cat sys class thermal thermal zone0 temp 64036x86 平台下
  • vue项目配置rem移动端适配

    一 项目介绍 脚手架CLI vue cli Vue版本 2 6 11 移动UI组件库 Vant 2 10 14 CSS预处理器 sass 二 配置lib flexible插件 下载插件 npm i D lib flexible 导入 在sr
  • 据说,80%的人都搞不懂哈希算法 区块链 哈希算法

    本文约9000字 阅读 观看 需要52分钟 聊到区块链的时候也少不了会听到 哈希 哈希函数 哈希算法 是不是听得一头雾水 别急 这一讲我们来讲讲什么是哈希算法 哈希是一种加密算法 哈希函数 Hash Function 也称为散列函数或杂凑函
  • 车辆贷款违约预测

    1 案例介绍 国内某贷款机构的车贷业务面临借款人拖欠还款或拒不还款 导致该机构的不良贷款率居高不下的问题 该机构将部分贷款数据开放 诚邀大家帮助他们建立风险识别模型来预测可能违约的借款人 敏感信息已脱敏 给定某机构实际业务中的相关借款人信息
  • 数据库的4种隔离级别

    数据库事务的隔离级别有4种 由低到高分别为Read uncommitted Read committed Repeatable read Serializable 而且 在事务的并发操作中可能会出现脏读 不可重复读 幻读 下面通过事例一一阐
  • kafka 使用python消费consumer

    参考 python kafka 使用 大数据 kafka常见问题 kafka python之操作kafka Kafka基本了解 使用python读取consumer中的数据 安装kafka python pip install kafka