Kafka偏移量(Offset)管理

2023-11-02

1.定义

Kafka中的每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序号,用于partition唯一标识一条消息。

Offset记录着下一条将要发送给Consumer的消息的序号。

流处理系统常见的三种语义:

最多一次 每个记录要么处理一次,要么根本不处理
至少一次 这比最多一次强,因为它确保不会丢失任何数据。但是可能有重复的
有且仅有一次 每条记录将被精确处理一次,没有数据会丢失,也没有数据会被多次处理

The semantics of streaming systems are often captured in terms of how many times each record can be processed by the system. There are three types of guarantees that a system can provide under all possible operating conditions (despite failures, etc.)

  1. At most once: Each record will be either processed once or not processed at all.
  2. At least once: Each record will be processed one or more times. This is stronger than at-most once as it ensure that no data will be lost. But there may be duplicates.
  3. Exactly once: Each record will be processed exactly once - no data will be lost and no data will be processed multiple times. This is obviously the strongest guarantee of the three.

 2.Kafka offset Management with Spark Streaming

Offset首先建议存放到Zookeeper中,Zookeeper相比于HBASE等来说更为轻量级,且是做HA(高可用性集群,High Available)的,offset更安全。

对于offset管理常见的两步操作:

  • 保存offsets
  • 获取offsets

3.环境准备

启动一个Kafka生产者,测试使用topic:tp_kafka:

./kafka-console-producer.sh --broker-list hadoop000:9092 --topic tp_kafka

启动一个Kafka消费者:

./kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic tp_kafka

在IDEA中生产数据:

package com.taipark.spark;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;
import java.util.UUID;

public class KafkaApp {

    public static void main(String[] args) {
        String topic = "tp_kafka";

        Properties props = new Properties();
        props.put("serializer.class","kafka.serializer.StringEncoder");
        props.put("metadata.broker.list","hadoop000:9092");
        props.put("request.required.acks","1");
        props.put("partitioner.class","kafka.producer.DefaultPartitioner");
        Producer<String,String> producer = new Producer<>(new ProducerConfig(props));

        for(int index = 0;index <100; index++){
            KeyedMessage<String, String> message = new KeyedMessage<>(topic, index + "", "taipark" + UUID.randomUUID());
            producer.send(message);
        }
        System.out.println("数据生产完毕");

    }
}

4.第一种offset管理方式:smallest

Spark Streaming链接Kafka统计个数:

package com.taipark.spark.offset

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Offset01App {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("Offset01App")
    val ssc = new StreamingContext(sparkConf,Seconds(10))

    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> "hadoop000:9092",
      "auto.offset.reset" -> "smallest"
    )
    val topics = "tp_kafka".split(",").toSet
    val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)

    messages.foreachRDD(rdd=>{
      if(!rdd.isEmpty()){
        println("Taipark" + rdd.count())
      }
    })

    ssc.start()
    ssc.awaitTermination()
  }

}

再生产100条Kafka数据->Spark Streaming接受:

但这时如果Spark Streaming停止后重启:

会发现这里重头开始计数了,原因是代码里将auto.offset.reset的值设置为了smallest。(kafka-0.10.1.X版本之前)

5.第二种offset管理方式:checkpoint

在HDFS中创建一个/offset文件夹:

hadoop fs -mkdir /offset

使用Checkpoint:

package com.taipark.spark.offset

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}

object Offset01App {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("Offset01App")

    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> "hadoop000:9092",
      "auto.offset.reset" -> "smallest"
    )
    val topics = "tp_kafka".split(",").toSet
    val checkpointDirectory = "hdfs://hadoop000:8020/offset/"
    def functionToCreateContext():StreamingContext = {
      val ssc = new StreamingContext(sparkConf,Seconds(10))
      val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
      //设置checkpoint
      ssc.checkpoint(checkpointDirectory)
      messages.checkpoint(Duration(10*1000))

      messages.foreachRDD(rdd=>{
        if(!rdd.isEmpty()){
          println("Taipark" + rdd.count())
        }
      })

      ssc
    }
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,functionToCreateContext _)




    ssc.start()
    ssc.awaitTermination()
  }

}

注:IDEA修改HDFS用户,在设置里的VM options中:

-DHADOOP_USER_NAME=hadoop

先启动:

发现消费了之前的100条。这是停止之后,生产100条,再启动:

发现这里只读取了上次结束到这次启动之间的100条,而不是像smallest一样读取之前所有条数。

但是checkpiont存在问题,如果采用这种方式管理offset,只要业务逻辑发生了变化,则checkpoint就没有作用了。因为其调用的是getOrCreate()。

6.第三种offset管理方式:手动管理偏移量

思路:

  1. 创建StreamingContext
  2. 从Kafka获取数据  <== 拿到offset
  3. 根据业务逻辑进行处理
  4. 将处理结果写入外部存储 ==>保存offset
  5. 启动程序等待线程终止
package com.taipark.spark.offset

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Offset01App {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("Offset01App")
    val ssc = new StreamingContext(sparkConf,Seconds(10))


    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> "hadoop000:9092",
      "auto.offset.reset" -> "smallest"
    )
    val topics = "tp_kafka".split(",").toSet
    //从某地获取偏移量
    val fromOffsets = Map[TopicAndPartition,Long]()

    val messages = if(fromOffsets.size == 0){  //从头消费
      KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
    }else{  //从指定偏移量消费

      val messageHandler = (mm:MessageAndMetadata[String,String]) => (mm.key,mm.message())
      KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,kafkaParams,fromOffsets,messageHandler)

      )
    }

    messages.foreachRDD(rdd=>{
      if(!rdd.isEmpty()){
        //业务逻辑
        println("Taipark" + rdd.count())

        //将offset提交保存到某地
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        offsetRanges.foreach(x =>{
          //提交如下信息提交到外部存储
          println(s"${x.topic} ${x.partition} ${x.fromOffset} ${x.untilOffset}")
        })
      }
    })

    ssc.start()
    ssc.awaitTermination()
  }

}
  • 先保存offset后保存数据可能导致数据丢失
  • 先保存数据后保存offset可能导致数据重复执行

解决方式1:实现幂等(idempotent)

在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。

解决方式2:事务 (transaction)

1.数据库事务可以包含一个或多个数据库操作,但这些操作构成一个逻辑上的整体。

2.构成逻辑整体的这些数据库操作,要么全部执行成功,要么全部不执行。

3.构成事务的所有操作,要么全都对数据库产生影响,要么全都不产生影响,即不管事务是否执行成功,数据库总能保持一致性状态。

4.以上即使在数据库出现故障以及并发事务存在的情况下依然成立。

将业务逻辑与offset保存放在一个事务里,仅执行一次。

7.Kafka-0.10.1.X版本之后的auto.kafka.reset:

earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka偏移量(Offset)管理 的相关文章

  • kafka系列——KafkaProducer源码分析

    实例化过程 在KafkaProducer的构造方法中 根据配置项主要完成以下对象或数据结构的实例化 配置项中解析出 clientId 用于跟踪程序运行情况 在有多个KafkProducer时 若没有配置 client id则clientId
  • shell脚本,一次性启动kafka集群

    版本centos6 5 64位操作系统 已配置JDK1 8 三个节点 在s121节点上可以免密登录到另外两个节点 另外kafka0 9 0 1的安装目录相同 修改了主机名 并在每个节点的hosts文件中设置了映射 脚本内容 bin bash
  • 如何偏移多边形边?

    I have a list of point2D that makes a closed polygon Now I want to create another set of 2D points by offsetting the pol
  • 如何在 Javascript 中获取对象在页面上的绝对位置? [复制]

    这个问题在这里已经有答案了 我想在 Javascript 中获取页面上对象的绝对 x y 位置 我怎样才能做到这一点 I tried obj offsetTop and obj offsetLeft 但这些仅给出相对于父元素的位置 我想我可
  • VBA/Excel 中行和列范围偏移的最大值是多少?

    我正在使用 microsoft excel 2003 执行以下 If 语句时收到 应用程序定义或对象定义错误 如果 Range MyData CurrentRegion Offset i 0 Resize 1 1 Value Range M
  • 消息队列选型:Kafka 如何实现高性能?

    在分布式消息模块中 我将对消息队列中应用最广泛的 Kafka 和 RocketMQ 进行梳理 以便于你在应用中可以更好地进行消息队列选型 另外 这两款消息队列也是面试的高频考点 所以 本文我们就一起来看一下 Kafka 是如何实现高性能的
  • prevObject 是什么?为什么我的选择器返回它?

    我试图从元素中获取顶部 但收到此错误 这是什么意思以及如何摆脱它 hover offset top gt Uncaught TypeError Cannot read property top of undefined hover div
  • 我有一个 has_many 关系,我想设置自定义限制和偏移量。以及计算它们

    Hy My code profile images 我只想一次只获取 10 张图像 偏移量为 10 就像这样 profile images limit gt 10 offset gt 10 不是这样的 has many images lim
  • 未定义的偏移量:1

    在我当前的 PHP 脚本中出现此错误 未定义的偏移量 1 我的代码在这里 query SELECT item id username item content FROM updates ORDER BY update time DESC L
  • 如何删除消费者已经消费过的数据?卡夫卡

    我正在kafka中进行数据复制 但是 kafka 日志文件的大小增长得非常快 一天的大小达到 5 GB 作为这个问题的解决方案 我想立即删除已处理的数据 我正在 AdminClient 中使用删除记录方法来删除偏移量 但是当我查看日志文件时
  • 如何在编译时计算类成员的偏移量?

    给定 C 中的类定义 class A public methods definition private int i char str 是否可以使用 C 模板元编程在编译时计算类成员的偏移量 该类不是 POD 并且可以具有虚拟方法 基元和对
  • 如何使用直骨架计算多边形的斜接偏移

    我有一个用 Python 实现的 Straight Skeleton 算法 想用它来偏移多边形的边缘 我看过几篇提出这种抵消方法的论文 遗憾的是它们都没有提供有关如何实现它的具体信息 他们之中 带孔简单二维多边形直骨架的 CGAL 实现 h
  • postgresql:偏移+限制变得非常慢

    我有一张桌子tmp drop ids用一列 id 以及 330 万个条目 我想迭代该表 每 200 个条目执行一些操作 我有这个代码 LIMIT 200 for offset in xrange 0 drop count LIMIT LIM
  • SQL Server 的 LIMIT 和 OFFSET 等效吗?

    在 PostgreSQL 中有Limit and Offset关键字将允许非常轻松地对结果集进行分页 SQL Server 的等效语法是什么 SQL Server 2012 中现在可以轻松实现此功能 从 SQL Server 2012 开始
  • offsetLeftAndRight() 到底做了什么?

    offsetLeftAndRight 到底做了什么 文档说 将此视图的水平位置偏移指定的像素量 那么 这是否意味着如果视图的左侧位置假设为 50 并且在其上调用 offsetLeftAndRight 20 那么视图将移动 20 像素并且其左
  • 未将 Win32 可移植可执行映像映射到偏移量 0 处的可能原因有哪些?

    我最近一直在研究 Window 的 PE 格式 我注意到在大多数示例中 人们倾向于设定ImageBase中的偏移值optional header到一些不合理的高的东西 比如0x400000 什么可能使它不利not在偏移处映射图像0x0 首先
  • PHP 警告:非法字符串偏移

    我是 PHP 新手 今天 PHP 从 5 3 3 版本迁移到 5 4 4 版本 Debian Squeeze 到 Debian Wheezy 之后 我从 Apache 日志中收到此错误 gt PHP 警告 xyz 中的非法字符串偏移 php
  • 如何为 MySQL 数据库中的所有时间戳/DATETIME 添加偏移量?

    我有一些 MySQL 数据库 其中有多个表 其中包含 除其他外 一些 DATETIME 列 我正在寻找一种方法来向整个数据库中的所有 DATETIME 列添加一定的时间 比如一年 如果数据最初写入数据库时 系统时间错误 这会很有用 或者 就
  • C 代码获取相对于 UTC 的本地时间偏移(以分钟为单位)?

    我没有找到一种简单的方法来获取本地时间和 UTC 时间之间的时间偏移 以分钟为单位 起初我打算使用tzset 但它不提供夏令时 根据手册页 如果夏令时有效 它只是一个不为零的整数 虽然通常是一个小时 但在某些国家 地区可能是半小时 我宁愿避
  • Ruby 在带有偏移量的数组中查找

    我正在寻找一种以更简洁的方式在 Ruby 中执行以下操作的方法 class Array def find index with offset offset block offset 1 find block end end offset a

随机推荐