kafka enable.auto.commit和auto.offset.reset使用说明

enable.auto.commit

是否自动提交offset,默认是true。

auto.offset.reset

表示自动重置 offset。

auto.offset.reset 参数定义了当无法获取消费分区的位移时从何处开始消费。例如:当 Broker 端没有 offset(如第一次消费或 offset 超过7天过期)时如何初始化 offset,当收到 OFFSET_OUT_OF_RANGE 错误时如何重置 Offset。

earliest:自动重置到 partition 的最小 offset。
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。
latest:默认为 latest,表示自动重置到 partition 的最大 offset。
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。
none:不自动进行 offset 重置,抛出 OffsetOutOfRangeException 异常。
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。

auto.offset.reset=none 使用说明

使用背景

不希望发生 offset 自动重置的情况,因为业务不允许发生大规模的重复消费。

注意:

此时消费组在第一次消费的时候就会找不到 offset 而报错,这时就需要在 catch 里手动设置 offset。

使用说明

auto.offset.reset 设置为 None 以后,可以避免 offset 自动重置的问题,但是当增加分区的时候,因为关闭了自动重置机制,客户端不知道新的分区要从哪里开始消费,则会产生异常,此时需要人工去设置消费分组 offset 并消费。

使用方式

消费者在消费时,当 consumer 设置 auto.offset.reset=none, 捕获到 NoOffsetForPartitionException 异常,在 catch 里自己设置 offset。您可以根据自身业务情况选择以下方式中的其中一种。

指定 offset,这里需要自己维护 offset,方便重试。

指定从头开始消费。

指定 offset 为最近可用的 offset。

根据时间戳获取 offset,设置 offset。

总结:

package com.tencent.tcb.operation.ckafka.plain;


import com.google.common.collect.Lists;
import com.tencent.tcb.operation.ckafka.JavaKafkaConfigurer;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SaslConfigs;


public class KafkaPlainConsumerDemo {


    public static void main(String args[]) {
        //设置JAAS配置文件的路径。
        JavaKafkaConfigurer.configureSaslPlain();


        //加载kafka.properties。
        Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();


        Properties props = new Properties();
        //设置接入点,请通过控制台获取对应Topic的接入点。
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));


        //接入协议。
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        //Plain方式。
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        //两次Poll之间的最大允许间隔。
        //消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        //每次Poll的最大数量。
        //注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        //消息的反序列化方式。
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        //当前消费实例所属的消费组,请在控制台申请之后填写。
        //属于同一个组的消费实例,会负载消费消息。
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));


        //消费offset的位置。注意!如果auto.offset.reset=none这样设置,消费组在第一次消费的时候 就会报错找不到offset,第一次这时候就需要在catch里手动设置offset。
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
        //构造消费对象,也即生成一个消费实例。
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //设置消费组订阅的Topic,可以订阅多个。
        //如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样。
        List<String> subscribedTopics = new ArrayList<String>();
        //如果需要订阅多个Topic,则在这里添加进去即可。
        //每个Topic需要先在控制台进行创建。
        String topicStr = kafkaProperties.getProperty("topic");
        String[] topics = topicStr.split(",");
        for (String topic : topics) {
            subscribedTopics.add(topic.trim());
        }
        consumer.subscribe(subscribedTopics);
        //循环消费消息。
        while (true) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                //必须在下次Poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG。 建议开一个单独的线程池来消费消息,然后异步返回结果。
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(
                            String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
                }
            } catch (NoOffsetForPartitionException e) {
                System.out.println(e.getMessage());


                //当auto.offset.reset设置为 none时,需要捕获异常 自己设置offset。您可以根据自身业务情况选择以下方式中的其中一种。
                //e.g 1 :指定offset, 这里需要自己维护offset,方便重试。
                Map<Integer, Long> partitionBeginOffsetMap = getPartitionOffset(consumer, topicStr, true);
                Map<Integer, Long> partitionEndOffsetMap = getPartitionOffset(consumer, topicStr, false);
                consumer.seek(new TopicPartition(topicStr, 0), 0);


                //e.g 2:从头开始消费
                consumer.seekToBeginning(Lists.newArrayList(new TopicPartition(topicStr, 0)));


                //e.g 3:指定offset为最近可用的offset。
                consumer.seekToEnd(Lists.newArrayList(new TopicPartition(topicStr, 0)));


                //e.g 4: 根据时间戳获取offset,就是根据时间戳去设置offset。例如重置到10分钟前的offset
                Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
                Long value = Instant.now().minus(300, ChronoUnit.SECONDS).toEpochMilli();
                timestampsToSearch.put(new TopicPartition(topicStr, 0), value);
                Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer
                        .offsetsForTimes(timestampsToSearch);
                for (Entry<TopicPartition, OffsetAndTimestamp> entry : topicPartitionOffsetAndTimestampMap
                        .entrySet()) {
                    TopicPartition topicPartition = entry.getKey();
                    OffsetAndTimestamp entryValue = entry.getValue();
                    consumer.seek(topicPartition, entryValue.offset()); // 指定offset, 这里需要自己维护offset,方便重试。
                }




            }
        }
    }


    /**
     * 获取topic的最早、最近的offset
     * @param consumer
     * @param topicStr
     * @param beginOrEnd true begin; false end
     * @return
     */
    private static Map<Integer, Long> getPartitionOffset(KafkaConsumer<String, String> consumer, String topicStr,
            boolean beginOrEnd) {
        Collection<PartitionInfo> partitionInfos = consumer.partitionsFor(topicStr);
        List<TopicPartition> tp = new ArrayList<>();
        Map<Integer, Long> map = new HashMap<>();
        partitionInfos.forEach(str -> tp.add(new TopicPartition(topicStr, str.partition())));
        Map<TopicPartition, Long> topicPartitionLongMap;
        if (beginOrEnd) {
            topicPartitionLongMap = consumer.beginningOffsets(tp);
        } else {
            topicPartitionLongMap = consumer.endOffsets(tp);
        }
        topicPartitionLongMap.forEach((key, beginOffset) -> {
            int partition = key.partition();
            map.put(partition, beginOffset);
        });
        return map;
    }


}

springboot项目下

 /**
     * enable-auto-commit: false 由spring提交
     * enable-auto-commit: true  由kafka提交
     */

    /**
     * enable-auto-commit: true  相同组下  (换组 会重置数据)
     * 如果这个topic某个分区有已经提交的offset,那么无论是把auto.offset.reset=earliest还是latest,都将失效,消费者会从已经提交的offset开始消费.
     */

    /**
     * enable-auto-commit: fasle 相同组下 (换组 会重置数据)
     * 如果这个topic某个分区没有提交的offset,那么把auto.offset.reset=latest,将没消费的设置为提交消费,然后从最后开始消费
     * 如果这个topic某个分区没有提交的offset,那么把auto.offset.reset=earliest,从没开始消费的offset开始消费
     */

非springboot项目下

enable.auto.commit false
    auto.offset.reset earliest 第一次消费, 重启后消费  都会从第一条开始重新消费全部数据
enable.auto.commit true
    auto.offset.reset earliest 第一次消费全部数据,重启后从提交处开始消费

enable.auto.commit false
    auto.offset.reset latest  第一次,重启后会从最后一条开始消费,但没有提交,换成earliest 重新消费全部数据
enable.auto.commit true
   auto.offset.reset latest   第一次从最后一条开始消费,重启后从提交处开始消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/554813.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

大量excel文件私密性较强 需要密码保护 如何给excel文件批量加密

一&#xff0c;前言 在现代办公环境中&#xff0c;Excel文件已成为数据存储和交流的常见工具。然而&#xff0c;随着数据泄露和信息安全问题的日益严重&#xff0c;如何保护Excel文件的安全性成为了我们关注的焦点。批量加密Excel文件成为了一种有效的解决方案&#xff0c;它可…

【光伏行业】光伏发电的应用领域

光伏发电作为一种清洁、可再生的能源技术&#xff0c;在多个领域都有广泛的应用。以下是一些光伏发电的主要应用领域&#xff1a; 住宅和商业建筑&#xff1a;光伏板可以安装在屋顶上&#xff0c;为建筑提供电力。这不仅降低了对电网的依赖&#xff0c;还减少了电费支出&#x…

Spring (四) 之配置及配置文件的操作

文章目录 1、Spring 基于注解的配置基于注解的配置引入依赖包配置实体类数据访问层业务层业务层实现测试 2、Bean和Component和Configuration的区别1 Bean:2 Component:3 Configuration:总结&#xff1a; 区别Component和Configuration区别 3、Spring读取properties配置文件准备…

【C++】开始使用优先队列

送给大家一句话: 这世上本来就没有童话&#xff0c;微小的获得都需要付出莫大的努力。 – 简蔓 《巧克力色微凉青春》 开始使用优先队列 1 前言2 优先队列2.1 什么是优先队列2.2 使用手册2.3 仿函数 3 优先队列的实现3.1 基本框架3.2 插入操作3.3 删除操作3.4 其他函数 4 总结T…

【ZYNQ】PS和PL数据交互丨AXI总线(主机模块RTL代码实现)

文章目录 一、PS-PL数据交互桥梁&#xff1a;AXI总线1.1 AXI总线和AXI4总线协议1.2 PS-PL数据传输的主要场景1.2.1 PL通过AXI_HP操作DDR3 Controller读写DDR31.2.2 PS作主机使用GP接口传输数据 1.3 AXI端口带宽理论1.4 AXI 总线的读写分离机制1.5 握手机制1.6 AXI_Lite总线1.7 …

【软考】设计模式之命令模式

目录 1. 说明2. 应用场景3. 结构图4. 构成5. 优缺点5.1 优点5.2 缺点 6. 适用性7.java示例 1. 说明 1.命令模式&#xff08;Command Pattern&#xff09;是一种数据驱动的设计模式。2.属于行为型模式。3.请求以命令的形式被封装在对象中&#xff0c;并传递给调用对象。4.调用对…

制作直通网线和交叉网线

制作直通网线和交叉网线 1. 网络直通线2. 网络交叉线References 双绞线的连接方法有两种&#xff1a;直通连接和交叉连接 。 直通连接是将双绞线的两端分别都依次按白橙、橙、白绿、蓝、白蓝、绿、白棕、棕色的顺序 (国际 EIA/TIA 568B 标准) 压入 RJ45 水晶头内。这种方法制作…

剧本杀小程序:线上剧本杀成为行业必然趋势

剧本杀作为一个社交娱乐游戏方式&#xff0c;受到了年轻人的喜爱。剧本杀是一个新型的游戏方式&#xff0c;能够带大众带来新鲜感和刺激感&#xff0c;让玩家通过角色扮演进行游戏体验&#xff1b;并且剧本杀还具有较强的社交性&#xff0c;在当下快节奏生活下&#xff0c;以游…

【AI】在Windows10下部署本地LLM RAG服务

【背景】 上一篇介绍了如何用Ubuntu命令行部署ollama LLM+RAG服务。部署后等于拥有了基于内网的AI Saas服务,其它内网用户可以通过默认的网址访问Playground对AI进行问答。 【概念】 RAG:通过词向量技术,将文件内容向量化后,通过语言模型以自然交流的形式得到文本相关的…

MySQL表级锁——技术深度+1

引言 本文是对MySQL表级锁的学习&#xff0c;MySQL一直停留在会用的阶段&#xff0c;需要弄清楚锁和事务的原理并DEBUG查看。 PS:本文涉及到的表结构均可从https://github.com/WeiXiao-Hyy/blog中获取&#xff0c;欢迎Star&#xff01; MySQL表级锁 MySQL中表级锁主要有表锁…

【CAD建模号】学习笔记(三):图形绘制区1

图形绘制区介绍 CAD建模号的图形绘制区可以绘制我们所需要的各种3D模型&#xff0c;绘制的图形即为模型对象&#xff0c;包括线、面、体等。 1. 二维图形绘制组 二维图形是建模的基础&#xff0c;大多数复杂的模型都是基于二维图形制作出来的&#xff0c;掌握二维图形的绘制等…

png静图转换gif动图如何操作?轻松一键快速转换gif动图

想要把多张Png格式图片转换成gif格式动图时要怎么操作&#xff1f;图片常见的有静图和动图&#xff0c;而jpg、png、gif等是最常见的图片格式。想要把png格式图片转换成gif动画还不想下载任何软件的时候就可以使用gif制作工具。不需要下载软件在线就能操作。能够轻轻松松就能快…

uniapp开发之【上传图片上传文件】的功能

一、上传图片功能&#xff0b;图片回显点击图片预览&#xff1a; 是通过uview框架的u-upload进行开发的&#xff0c;先导入uview&#xff01; <template><view class""><!-- 按钮 --><view class"listBtn" click"uploadDesign()…

透过内核收包流程理解DPDK

前言 网络通信作为互联网的底座&#xff0c;其网络服务质量直接影响着用户的上网体验。如微信这类级别的应用&#xff0c;拥有上亿级别的日活&#xff0c;是典型的高并发的场景&#xff0c;简单的堆硬件无法有效的解决该类问题&#xff0c;提高单台服务器的性能成为问题的焦点…

百度文心一言与谷歌Gemini的对比

版权声明 本文原创作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl 本文从多角度将百度文心一言与谷歌Gemini进行对比。因为不同评测基准的侧重点和难度可能有所不同&#xff0c;所以本文涉及到的评测结果仅供参考。Gemini和文心一言都是非常…

文件 IO

IO 的概念 I&#xff1a;Input 输入 O&#xff1a;Output 输出 输入和输出的规定 人为规定&#xff1a; 以CPU为视角&#xff0c;数据远离 CPU 的是输出&#xff0c;数据朝着 CPU 过来的是输入 例子&#xff1a; 1.在电脑上&#xff0c;通过网络下载文件 > 数据通过网卡…

IDM的实用功能介绍+下载地址

下载地址 &#xff1a; 下载到idm 互联网下载管理器&#xff08;IDM&#xff09;实用功能概述 1. 多线程下载 IDM使用多线程技术&#xff0c;将文件分割成多个部分同时下载&#xff0c;显著提高下载速度。 2. 计划任务 用户可以设定下载任务的开始时间&#xff0c;甚至在特…

如何解决msvcp140.dll文件丢失的问题?有效修复msvcp140.dll的方法分析

在使用Windows操作系统时&#xff0c;经常会遇到一些烦人的问题&#xff0c;其中&#xff0c;缺少dll文件是比较常见的情况之一。而其中&#xff0c;缺少msvcp140.dll文件是常见的一种情况。今天&#xff0c;我们将重点介绍如何解决msvcp140.dll文件丢失的问题&#xff0c;并向…

Docker 磁盘占用过多问题处理过程记录

一、问题描述 突然发现服务器磁盘使用超过95%了&#xff08;截图时2.1 和 2.2 已经执行过了&#xff09; 二、问题分析与解决 2.1&#xff0c;docker 无用镜像占用磁盘 # 使用 docker images 查看服务的镜像 docker images# 可以手动删除一些很大不用的 docker rmi ***## 也…

javaWeb项目-校园交友网站功能介绍

项目关键技术 开发工具&#xff1a;IDEA 、Eclipse 编程语言: Java 数据库: MySQL5.7 框架&#xff1a;ssm、Springboot 前端&#xff1a;Vue、ElementUI 关键技术&#xff1a;springboot、SSM、vue、MYSQL、MAVEN 数据库工具&#xff1a;Navicat、SQLyog 1、Java语言 Java语…
最新文章