什么是Kafka
在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算。
Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。
Kafka最初是由LinkedIn公司开发,并于 2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。
无论是kafka集群,还是producer和consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。
准备基础环境
安装JDK8官网
下载地址:https://file.lolicp.com/java/jdk-8u241-linux-x64.rpm
[root@master-all ~]# rpm -ivh jdk-8u241-linux-x64.rpm
下载kafka安装包官网
下载地址:https://file.lolicp.com/linux/kafka_2.12-2.5.0.tgz
[root@master-all ~]# wget http://apache.communilink.net/kafka/2.5.0/kafka_2.12-2.5.0.tgz
[root@master-all ~]# tar xzf kafka_2.12-2.5.0.tgz
[root@master-all ~]# cp -r kafka_2.12-2.5.0/ /usr/local/kafka
[root@master-all ~]# cd /usr/local/kafka/config/
修改配置文件
Kafka
[root@master-all config]# cp server.properties{,.bak}
[root@master-all config]# vim server.properties
以下为配置文件内容:
#集群唯一ID
broker.id=1
#监听端口
port=9092
advertised.host.name=192.168.230.202
advertised.port=9092
#服务器用于从网络接收请求并向网络发送响应的线程数
num.network.threads=2
#服务器用于处理请求的线程数,其中可能包括磁盘I/O
num.io.threads=8
#套接字服务器使用的发送缓冲区(SO_SNDBUF)
socket.send.buffer.bytes=1048576
#套接字服务器使用的接收缓冲区(SO_RCVBUF)
socket.receive.buffer.bytes=1048576
#套接字服务器将接受的请求的最大大小(针对OOM的保护)
socket.request.max.bytes=104857600
#用逗号分隔的目录列表,用于存储日志文件
log.dirs=/usr/local/kafka/logs/kafka
#每个主题的默认日志分区数。 更多的分区允许更大的并行使用,但这也将导致代理中的文件更多。
num.partitions=2
#日志文件因年龄而可以删除的最小年龄
log.retention.hours=168
#日志段文件的最大大小。 达到此大小后,将创建一个新的日志段
log.segment.bytes=536870912
#检查日志段以了解是否可以根据保留策略将其删除的时间间隔
log.retention.check.interval.ms=60000
#日志清除
log.cleaner.enable=false
#Zookeeper地址,多个zookeeper地址用","分隔
zookeeper.connect=192.168.230.202:2181
#以毫秒为单位的连接到Zookeeper的超时时间
zookeeper.connection.timeout.ms=1000000
#允许删除topic
delete.topic.enable=true
#topic的offset的备份份数,建议设置更高的数字保证更高的可用性
offsets.topic.replication.factor=1
Zookeeper
[root@master-all config]# vim zookeeper.properties
以下为配置文件内容:
#数据目录
dataDir=/usr/local/kafka/logs/zookeeper
#客户端端口
clientPort=2181
#禁用每个连接的IP限制,因为这是非生产配置
maxClientCnxns=0
#默认情况下禁用adminserver,以避免端口冲突。
admin.enableServer=false
#如果选择启用端口,则将端口设置为无冲突
# admin.serverPort=8080
[root@master-all config]# cd ../bin/
操作命令
启动zookeeper
[root@master-all bin]# ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
停止
[root@kafka01 bin]# ./zookeeper-server-stop.sh
启动kafka
[root@master-all bin]# ./kafka-server-start.sh -daemon ../config/server.properties
停止
[root@kafka01 bin]# ./kafka-server-stop.sh
-daemon为后台运行
创建topic
- replication-factor : 副本数量
- partitions : 分区数量
- topic : 定义topic名称
[root@master-all bin]# ./kafka-topics.sh --zookeeper 192.168.230.202:2181 --create --replication-factor 1 --partitions 1 --topic ym68
Created topic ym68.
查看topic list
[root@master-all bin]# ./kafka-topics.sh --list --zookeeper 192.168.230.202:2181
ym68
启动生产者
./kafka-console-producer.sh --broker-list 192.168.230.202:9092 --topic ym68
启动消费者
[root@master-all bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.230.202:9092 --from-beginning --topic ym68
kafka版本0.9以前用此命令为:
[root@master-all bin]# ./kafka-console-consumer.sh --zookeeper 192.168.230.202:2181 --from-beginning --topic ym68
删除topic
[root@master-all bin]# ./kafka-topics.sh --zookeeper 192.168.230.202:2181 --delete --topic ym68
Topic ym68 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
测试
[root@master-all bin]# ./kafka-console-producer.sh --broker-list 192.168.230.202:9092 --topic ym68
>hello01
>hello world
[root@master-all bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.230.202:9092 --from-beginning --topic ym68
hello01
hello world
遇到的问题
1.默认备份份数为3,修改配置文件为一
[2020-04-26 00:25:29,228] ERROR [KafkaApi-1] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
在配置文件中添加:
offsets.topic.replication.factor=1
2.没有java
[root@master-all bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties
/usr/local/kafka_1/bin/kafka-run-class.sh: line 315: exec: java: not found
安装最新版jdk8即可
[root@master-all ~]# rpm -ivh jdk-8u241-linux-x64.rpm