接上篇 Kafka 在 Windows 下集群搭建

%KAFKA_HOME%\bin\windows目录下再开启一个测试 Kafka 的命令

一、topic 命令行操作

参数描述
bootstrap-server <String: server toconnect to>连接的 Kafka Broker 主机名称和端口号
–topic <String: topic>操作的 topic 名称
–create创建主题
–delete删除主题
–alter修改主题
–list查看所有主题
–describe查看主题详细描述
–partitions <Integer: # of partitions>设置分区数(必须)
–replication-factor<Integer: replication factor>设置分区副本(必须),副本数不能超过 broker 数
–config <String: name=value>更新系统默认的配置

1、创建 topic

1
kafka-topics.bat --bootstrap-server localhost:9092 --create --replication-factor 3 --partitions 1 --topic firstTest

2、查看当前服务器中的所有 topic

1
kafka-topics.bat --bootstrap-server localhost:9092  --list

3、查看某个 Topic 的详情

1
kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic firstTest

说明:

  • Leader 是负责给定 Partition 的所有读写的节点。每个节点都可能成为 Partition 随机选择的 leader。这里就是 broker.id=0 的是 leader。
  • Replicas 是复制此 Partition 日志的节点列表,无论它们是 leader 还是当前处于存活状态。
  • Isr是一组 In-sync replicas。这是 Replicas 列表的一个子集,它当前处于存活状态,并补充 leader。

4、修改分区数(分区数只能增加,不能减少)

1
kafka-topics.bat --bootstrap-server localhost:9092 --alter --topic firstTest --partitions 3

查看修改后的结果

5、删除指定 topic

1
kafka-topics.bat --bootstrap-server localhost:9092 --delete --topic firstTest

Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

需要 server.properties 中设置 delete.topic.enable=true ,否则只是标记删除

删除 topic 后遇到一个问题:kafka 直接挂掉了,而且重启也失败,错误信息如下(暂时解决办法只能是删除整个日志目录)

ERROR Shutdown broker because all log dirs in D:\tmp\kafka1 have failed (kafka.log.LogManager)

二、生产者命令行操作

参数描述
–bootstrap-server <String: server toconnect to>连接的 Kafka Broker 主机名称和端口号
–topic <String: topic>操作的 topic 名

发送消息

1
kafka-console-producer.bat --bootstrap-server localhost:9092 --topic firstTest

然后可以输入消息内容

1
2
3
D:\My\project\kafka_2.12-2.8.1\bin\windows>kafka-console-producer.bat --bootstrap-server localhost:9092 --topic firstTest
>hello
>hi

三、消费者命令行操作

参数描述
–bootstrap-server <String: server toconnect to>连接的 Kafka Broker 主机名称和端口号
–topic <String: topic>操作的 topic 名
–from-beginning从头开始消费(包括历史消息)
–group <String: consumer group id>指定消费者组名

不要关闭上面的生产者窗口,新建一个 cmd 窗口

使用消费者消费消息(从头开始,包括历史消息)

1
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic firstTest --from-beginning

从当前开始消费 firstTest 主题中的数据

1
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic firstTest