kafka删除topic数据

发布时间:2020-02-21 10:27:31编辑:admin阅读(4233)

    一、概述

    生产环境中,有一个topic的数据量非常大。这些数据不是非常重要,需要定期清理。

    要求:默认保持24小时,某些topic 需要保留2小时或者6小时

    1.png

    二、清除方式

    主要有3个:

    1. 基于时间

    2. 基于日志大小

    3. 基于日志起始偏移量

     

    详情,请参考链接:

    https://blog.csdn.net/u013256816/article/details/80418297

     

    接下来,主要介绍基于时间的清除!

    kafka版本为:  2.11-1.1.0

    zk版本为:  3.4.13

     

    三、kafka配置

    # 启用删除主题
    delete.topic.enable=true
    # 检查日志段文件的间隔时间,以确定是否文件属性是否到达删除要求。
    log.retention.check.interval.ms=1000

    注意:这2行配置必须存在,否则清除策略失效!

    log.retention.check.interval.ms 参数的单位是毫秒,这里表示间隔1秒钟

     

    四、清除策略

    全局topic

    在 server.properties 文件中配置的是全局策略,针对每一个topic

    比如:

    log.retention.hours=3
    表示保留3个小时

     

    单个topic

    针对单个topic策略,需要使用脚本kafka-configs.sh

    此脚本不需要重启kafka就会生效!

     

    首先来查看一下,当前的topic策略,比如test

    bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --describe --entity-type topics --entity-name test

    参数解释:

    --describe  详细信息

    --entity-type 实体类型

    --entity-name 指定topic名

     

    输出:

    Configs for topic 'test' are

    这个表示为策略为空

     

    删除topic数据

    如果需要删除topic所有数据,使用命令

    bin/kafka-topics.sh --delete --topic test --zookeeper zookeeper-1.default.svc.cluster.local:2181

    这个命令,请谨慎执行!!!

     

    如果想保留主题,只删除主题现有数据(log)。可以通过修改数据保留时间实现

    bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=10000

    执行输出:

    Completed Updating config for entity: topic 'test'.

    注意:修改保留时间为10秒钟,并不是10秒钟就马上删掉。kafka是采用轮询的方式,轮询到这个topic时,删除10秒钟前的数据。

    时间由server.properties里面的log.retention.check.interval.ms选项为主

     

    假设说 log.retention.check.interval.ms 值为1分钟,那么等待70秒,这个topic的数据就会自动被删除!

     

    再次查看topic策略

    bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --describe --entity-type topics --entity-name test

    输出:

    Configs for topic 'test' are retention.ms=10000

    发现目前的删除策略为 retention.ms=10000

     

    删除策略

    如果需要删除上面的10秒策略,使用以下命令:

    bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --entity-type topics --entity-name test --alter --delete-config retention.ms

    输出:

    Completed Updating config for entity: topic 'test'.

    再次查看topic策略

    bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --describe --entity-type topics --entity-name test

    输出:

    Configs for topic 'test' are

    发现策略为空,说明删除成功了!

     

    五、测试清除策略

    测试思路

    1.png

     

    说明:

    第一步,设置清除策略为保留10秒

    第二步,进入生产者模式,输入消息 a

    第三步,等待5秒,再次进入生产者模式,输入消息 b

    第四部,进入消费者模式,看输出的消息是a还是b

     

    判断标准:

    在进行第三步时,a这条消息,应该已经被删除了。所以在第15秒进入消费者模式时,应该输出 b,这样的话,策略才是成功的!

     

    设置策略

    topic 为test的数据保留10秒

    bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=10000

     

    生产模式

    进入生产模式,输入a

    bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test
    > a

    等待5秒后,再次进入生产模式,输入b

    bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test
    > b

    消费者模式

    等待5秒后,进入 消费者模式

    bin/kafka-console-consumer.sh --bootstrap-server kafka-1.default.svc.cluster.local:9092 --topic test --from-beginning
    
    b

    如果消费者输出为b,表示策略成功!


    备注:

    如果生产环境中,正在不断的进行生产和消费,执行kafka-configs.sh 脚本,是否会有影响呢?

    答案是不会的,它是动态策略!

     

     

    本文参考链接:

    https://blog.csdn.net/forrest_ou/article/details/78999983


关键字