Kafka命令行操作
在%KAFKA_HOME%\bin\windows
目录下再开启一个测试 Kafka 的命令
一、topic 命令行操作
参数 | 描述 |
---|---|
bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号 |
–topic <String: topic> | 操作的 topic 名称 |
–create | 创建主题 |
–delete | 删除主题 |
–alter | 修改主题 |
–list | 查看所有主题 |
–describe | 查看主题详细描述 |
–partitions <Integer: # of partitions> | 设置分区数(必须) |
–replication-factor<Integer: replication factor> | 设置分区副本(必须),副本数不能超过 broker 数 |
–config <String: name=value> | 更新系统默认的配置 |
1、创建 topic
1 | kafka-topics.bat --bootstrap-server localhost:9092 --create --replication-factor 3 --partitions 1 --topic firstTest |
2、查看当前服务器中的所有 topic
1 | kafka-topics.bat --bootstrap-server localhost:9092 --list |
3、查看某个 Topic 的详情
1 | kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic firstTest |
说明:
Leader
是负责给定 Partition 的所有读写的节点。每个节点都可能成为 Partition 随机选择的 leader。这里就是 broker.id=0 的是 leader。Replicas
是复制此 Partition 日志的节点列表,无论它们是 leader 还是当前处于存活状态。Isr
是一组In-sync replicas
。这是 Replicas 列表的一个子集,它当前处于存活状态,并补充 leader。
4、修改分区数(分区数只能增加,不能减少)
1 | kafka-topics.bat --bootstrap-server localhost:9092 --alter --topic firstTest --partitions 3 |
查看修改后的结果
5、删除指定 topic
1 | kafka-topics.bat --bootstrap-server localhost:9092 --delete --topic firstTest |
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.需要 server.properties 中设置 delete.topic.enable=true ,否则只是标记删除
删除 topic 后遇到一个问题:kafka 直接挂掉了,而且重启也失败,错误信息如下(暂时解决办法只能是删除整个日志目录)
ERROR Shutdown broker because all log dirs in D:\tmp\kafka1 have failed (kafka.log.LogManager)
二、生产者命令行操作
参数 | 描述 |
---|---|
–bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号 |
–topic <String: topic> | 操作的 topic 名 |
发送消息
1 | kafka-console-producer.bat --bootstrap-server localhost:9092 --topic firstTest |
然后可以输入消息内容
1 | D:\My\project\kafka_2.12-2.8.1\bin\windows>kafka-console-producer.bat --bootstrap-server localhost:9092 --topic firstTest |
三、消费者命令行操作
参数 | 描述 |
---|---|
–bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号 |
–topic <String: topic> | 操作的 topic 名 |
–from-beginning | 从头开始消费(包括历史消息) |
–group <String: consumer group id> | 指定消费者组名 |
不要关闭上面的生产者窗口,新建一个 cmd 窗口
使用消费者消费消息(从头开始,包括历史消息)
1 | kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic firstTest --from-beginning |
从当前开始消费 firstTest 主题中的数据
1 | kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic firstTest |
本文采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 ShiGuang
评论