下载地址:http://kafka.apache.org/downloads
按提示下载:
解压后文件目录:
修改配置文件config/server.properties
常用配置如下:
#broker 的全局唯一编号,不能重复
broker.id=0
#删除 topic 功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接 Zookeeper 集群地址
zookeeper.connect=localhost:2181
启停命令:
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-stop.sh config/server.properties
# 停止可以通过jps获取进程id然后kill
bin/kafka-topics.sh --zookeeper localhost:2181 --list
bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 3 --partitions 1 --
topic first
--选项说明:
--topic 定义 topic 名
--replication-factor 定义副本数
--partitions 定义分区数
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic first
bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic first --partitions 6
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic first
> hello kafka
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic first
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic first
# --from-beginning:会把主题中以往所有的数据都读取出来。
spring官网文档:https://spring.io/projects/spring-kafka
版本兼容:
症状:Producer连不上,提示没有可用Node。
解决:在安装kafka的目录中配置server.properties