Kafka日常维护

在软件项目的生命周期中,开发只占开始的一小部分,大部分时间我们要对项目进行运行维护,Kafka相关的项目也不例外。所以这里介绍了一些Kafka服务器基础的运维方法。

关于Kafka的日志

日志的英语是“log”,但Kafka的数据文件也被称为log,所以很多时候会造成一定的歧义。在Kafka中,日志分为两种:

  • 数据日志
  • 操作日志

数据日志是指Kafka的topic中存储的数据,这种日志的路径是在$KAFKA_HOME/config/server.properties文件中配置,配置项为log.dirs。如果此项没有被配置,默认会使用配置项 log.dir(请仔细观察,两个配置项最后差了一个s)。log.dir的默认路径为/tmp/kafka-logs,大家知道,/tmp路径下的文件在计算机重启的时候是会被删除的,因此,强烈推荐将文件目录设置在其他可以永久保存的路径。另一种日志是操作日志,类似于我们在自己开发的程序中输出的log日志(log4j),这种日志的路径是在启动Kafka的路径下。比如一般我们在KAFKA_HOMEl路径下启动kafka服务,操作的日志路径为 KAFKA_HOME/logs。

数据日志清理

数据日志有两种类型的清理方式,一种是按照日志被发布的时间来删除,另一种是按照日志文件的size来删除。有专门的配置项可以配置这个删除策略:

按时间删除:

Kafka提供了配置项让我们可以按照日志被发布的时间来删除。它们分别是:

  • log.retention.ms
  • log.retention.minutes
  • log.retention.hours

根据配置项的名称很容易理解它们的含义。log.retention.ms表示日志会被保留多少毫秒,如果为null,则Kafka会使用使用log.retention.minutes配置项。log.retention.minutes表示日志会保留多少分钟,如果为null,则Kafka会使用log.retention.hours选项。默认情况下,log.retention.ms和log.retention.minutes均为null,log.retention.hours为168,即Kafka的数据日志默认会被保留7天。如果想修改Kafka中数据日志被保留的时间长度,可以通过修改这三个选项来实现。

按size删除

Kafka除了提供了按时间删除的配置项外,也提供了按照日志文件的size来删除的配置项:

  • log.retention.bytes

即日志文件到达多少byte后再删除日志文件。默认为-1,即无限制。需要注意的是,这个选项的值如果小于segment文件大小的话是不起作用的。segment文件的大小取决于log.segment.bytes配置项,默认为1G。
另外,Kafka的日志删除策略并不是非常严格的(比如如果log.retention.bytes设置了10G的话,并不是超过10G的部分就会立刻删除,只是被标记为待删除,Kafka会在恰当的时候再真正删除),所以请预留足够的磁盘空间。当磁盘空间剩余量为0时,Kafka服务会被kill掉。

操作日志清理

目前Kafka的操作日志暂时不提供自动清理的机制,需要运维人员手动干预,比如使用shell和crontab命令进行定时备份、清理等。

Kafka配置IP访问

在默认情况下,访问Kafka集群需要在/etc/hosts文件下配置所有Kafka集群的hostname,这样比较麻烦。可以通过配置advertised.listeners配置项来使用ip来访问Kafka集群,比如单台Kafka broker的ip地址为192.168.1.100,那么可以使用如下配置来使客户端来使用ip访问:

advertised.listeners=PLAINTEXT://192.168.1.100:9092

Kafka的基本操作

接下来简单介绍一下关于Kafka集群的基本操作。所有这些用到的工具全部在Kafka目录下的/bin目录下可以找到。如果不添加任何参数运行这些工具脚本,控制台上会打印使用的提示。

添加topic

在Kafka中添加topic有两种方式,一种是手动添加,另一种是在数据第一次写入的时候动态添加。一般情况下还是推荐手动添加topic。
使用topic工具创建topic:

bin/kafka-topics.sh –zookeeper zk_host:port/chroot –create –topic my_topic_name –partitions 20 –replication-factor 3 –config x=y

replication-factor参数用来指定需要多少个副本(连同leader在内),一般比较推荐设置为2或3。如果设置太少(比如1)导致可用性下降,如果设置太大会影响Kafka的性能。

partitions参数用来设置topic有多少个partition。首先,每个partition必须存在于单台服务器上,就是说如果某个topic有20个partition,那么它们将最多分布在20台服务器上(不算replicas)。另外很重要的一点是,partition的数量将直接影响消费端程序的并行度。所以创建topic时要根据实际情况决定partition数量。

修改topic

修改topic主要是为了修改topic的partition数量和topic层面的配置。修改partition数量可以使用此命令:

bin/kafka-topics.sh –zookeeper zk_host:port –alter –topic my_topic_name –partitions 40

关于修改partition数量,有两点需要注意:

  1. partition只能增加,不支持减少
  2. 新增的partition只会对将来写入的数据起作用,以前存在的数据不会被移动到新的partition中

除了修改partition数量,我们还可以修改topic层面的配置,增加配置项:

bin/kafka-topics.sh –zookeeper zk_host:port/chroot –alter –topic my_topic_name –config x=y

删除配置项:

bin/kafka-topics.sh –zookeeper zk_host:port/chroot –alter –topic my_topic_name –delete-config x

删除topic

Kafka删除topic的方法也很简单。首先,需要在config/server.properties文件中配置

delete.topic.enable=true

这个配置项默认是关闭的,也就是不支持删除,我们需要手动配置为true。然后使用命令:

./bin/kafka-topics.sh –zookeeper zk_host:port –delete –topic my_topic_name

就可以删除topic了。

查看消费者位置

有些时候我们需要查看某个消费者组的消费情况,我们可以使用工具kafka-consumer-groups.sh来查看。

./bin/kafka-consumer-groups.sh –bootstrap-server localhost:9092 –describe –group test

查看当前正在使用的消费者组

自从kafka 0.9.0.0版本,Kafka引入了新的Java consumer模块代替了原来的Scala consumer模块。消费者组的元数据从zookeeper迁移到了broker中,所以使用新的API查看消费者组列表:

./bin/kafka-consumer-groups.sh –bootstrap-server localhost:9092 –list

查看topic的每个partition状态

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell –broker-list localhost:9092 –topic test –time -1

扩展Kafka集群

向Kafka集群添加服务器是很简单的,分配给它们一个唯一的broker id并且在新服务器上启动Kafka服务器就可以了。但是这些新服务器却不能被自动分配数据分区(partition),所以除非partition被移动到这些服务器上,否者这些服务器直到新建topic的时候才会发挥作用。所以,通常情况下当你添加服务器到你的集群中时,你都希望迁移一些已经存在的数据到这些服务器上。

迁移数据的的过程是手动启动的,但是执行是自动化的。内在过程就是Kafka将这些新服务器添加作为它们正在迁移的partition的follower并且允许它全部复制这个分区中已存在的数据。当新服务器完全复制此分区的内容并加入同步副本时,现有副本之一将删除其分区的数据。

重新分配分区工具(partition reassignment tool)可以被用来在brokers间移动partition。理想的partition分配会确保跨broker的均匀的数据负载和分区大小。重新分配分区工具并没有自动获取Kafka集群中的数据分布信息并均衡负载的能力。因此,管理员必须清楚要移动哪些topic或partition。

重新分配分区工具有以下三种互斥的运行模式:

  • generate:在这种模式下,给出topic列表和broker列表,分区工具将生成一个候选的分区方案用来将指定topic的所有分区移动到新broker上。这个选项仅仅提供了一种通过给出的topic列表和broker列表生成分区重新分配方案的简便方式。
  • execute:这种模式下,分区工具将根据用户提供的重新分区方案开始重新分区(使用 — –reassignment-json-file )。这个选项既可以执行管理员手写的重新分区文件也可以执行通过–generate命令生成的重新分区文件。
  • verify:这种模式下,分区工具验证上次执行-generate以来被重新分区的所有partition的状态。这些状态可能是successfully completed(成功完成), failed(失败)或in progress(进行中)

自动迁移数据到新的服务器上

重新分区工具可以将当前brokers上的topics移动到新的brokers上。这在扩展现有集群时很有用,因为将整个topic移动到新brokers上比一次移动一个partition更容易。然后,分区工具可以在新的brokers中均匀分配给定topic集合中的所有partition。在移动过程中,replication factor(副本数)保持不变。

举例来说,下面的例子将会把topics foo1、foo2的所有partition移动到新的brokers 5,6。当移动行为结束以后,foo1和foo2的所有partition将仅存在于broker 5、6。

因为分区工具接受将topic列表写入json文件的形式,所以你需要指定需要移动的topics并创建json文件,就像下面这样:

cat topics-to-move.json
{ “topics”: [{“topic”: “foo1”}, {“topic”: “foo2”}], “version”:1 }

当json文件准备好 了以后,使用重新分区工具自动生成一个候选的重新分配计划:

bin/kafka-reassign-partitions.sh –zookeeper localhost:2181 –topics-to-move-json-file topics-to-move.json –broker-list “5,6” –generate</br>
Current partition replica assignment</br>
{“version”:1,
“partitions”:[{“topic”:”foo1″,”partition”:2,”replicas”:[1,2]},
{“topic”:”foo1″,”partition”:0,”replicas”:[3,4]},
{“topic”:”foo2″,”partition”:2,”replicas”:[1,2]},
{“topic”:”foo2″,”partition”:0,”replicas”:[3,4]},
{“topic”:”foo1″,”partition”:1,”replicas”:[2,3]},
{“topic”:”foo2″,”partition”:1,”replicas”:[2,3]}]

}</br>
    Proposed partition reassignment configuration</br>
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
                {"topic":"foo1","partition":0,"replicas":[5,6]},
                {"topic":"foo2","partition":2,"replicas":[5,6]},
                {"topic":"foo2","partition":0,"replicas":[5,6]},
                {"topic":"foo1","partition":1,"replicas":[5,6]},
                {"topic":"foo2","partition":1,"replicas":[5,6]}]
  }

分区工具生成的候选计划将会将topic foo1,foo2的所有分区移动到broker 5,6。记住,尽管到了这一步,partition的移动却尚未开始,而仅仅告诉你当前的分区情况(Current partition replica assignment)和建议的新分区方案(Proposed partition reassignment configuration)。当前分区情况应该保留起来,以防你想要回滚。新分区方案应该被存入一个josn文件中,(比如:expand-cluster-reassignment.json),然后作为分区工具 –execute选项的输入文件。就像下面这样:

bin/kafka-reassign-partitions.sh –zookeeper localhost:2181 –reassignment-json-file expand-cluster-reassignment.json –execute</br>
Current partition replica assignment</br>
{“version”:1,
“partitions”:[{“topic”:”foo1″,”partition”:2,”replicas”:[1,2]},
{“topic”:”foo1″,”partition”:0,”replicas”:[3,4]},
{“topic”:”foo2″,”partition”:2,”replicas”:[1,2]},
{“topic”:”foo2″,”partition”:0,”replicas”:[3,4]},
{“topic”:”foo1″,”partition”:1,”replicas”:[2,3]},
{“topic”:”foo2″,”partition”:1,”replicas”:[2,3]}]
}</br>
Save this to use as the –reassignment-json-file option during rollback
Successfully started reassignment of partitions
{“version”:1,
“partitions”:[{“topic”:”foo1″,”partition”:2,”replicas”:[5,6]},
{“topic”:”foo1″,”partition”:0,”replicas”:[5,6]},
{“topic”:”foo2″,”partition”:2,”replicas”:[5,6]},
{“topic”:”foo2″,”partition”:0,”replicas”:[5,6]},
{“topic”:”foo1″,”partition”:1,”replicas”:[5,6]},
{“topic”:”foo2″,”partition”:1,”replicas”:[5,6]}]
}

最后,–verify选项可以用来检查重新分配分区的状态。注意此处应该使用与执行 –execute 选项时相同的expand-cluster-reassignment.json 文件:

bin/kafka-reassign-partitions.sh –zookeeper localhost:2181 –reassignment-json-file expand-cluster-reassignment.json –verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo1,1] is in progress
Reassignment of partition [foo1,2] is in progress
Reassignment of partition [foo2,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
Reassignment of partition [foo2,2] completed successfully

自定义分区分配和移动

分区重新分配工具也可以被用来移动broker被选中的分区。当使用这种方式的时候,我们假设用户了解重新分配分区方案并且不需要使用分区工具去自动生成候选重新分区方案。为了高效,我们跳过了–generate步骤,直接来到–execute步骤

举例来说,下面的例子将topic foo1的partition 0移动到 broker 5,6,将topic foo2的partition 1 移动到broker 2,3:
第一步来手写自定义重新分区计划的json文件:

cat custom-reassignment.json
{“version”:1,”partitions”:[{“topic”:”foo1″,”partition”:0,”replicas”:[5,6]},{“topic”:”foo2″,”partition”:1,”replicas”:[2,3]}]}

然后,使用–execute选项和json文件来开始重新分配分区的操作:

bin/kafka-reassign-partitions.sh –zookeeper localhost:2181 –reassignment-json-file custom-reassignment.json –execute
Current partition replica assignment</br>
{“version”:1,
“partitions”:[{“topic”:”foo1″,”partition”:0,”replicas”:[1,2]},
{“topic”:”foo2″,”partition”:1,”replicas”:[3,4]}]
}</br>
Save this to use as the –reassignment-json-file option during rollback
Successfully started reassignment of partitions
{“version”:1,
“partitions”:[{“topic”:”foo1″,”partition”:0,”replicas”:[5,6]},
{“topic”:”foo2″,”partition”:1,”replicas”:[2,3]}]
}

用–verify选项来查看被重新分配分区的状态。注意此处应该使用与执行 –execute 选项时相同的expand-cluster-reassignment.json 文件:

bin/kafka-reassign-partitions.sh –zookeeper localhost:2181 –reassignment-json-file custom-reassignment.json –verify
Status of partition reassignment:

  Reassignment of partition [foo1,0] completed successfully
  Reassignment of partition [foo2,1] completed successfully

发表评论