来玩一玩消息队列的王者——kafka
Kafka
概念
基于发布、订阅模式的消息中间件
没有消息队列的话,两个进程通信是直接连接
同步的效率会比较低,异步处理在执行之后还可以执行其他的。
缓冲也叫削峰。
使用消息队列的好处
-
解耦
允许独立的拓展或修改两边的处理过程,只要确保他们遵守同样的接口约束
-
可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程之间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理
-
缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况
-
灵活性&峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命,无疑是一个巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃
消息队列的两种模式
-
点对点模式
一对一,消费者主动拉取数据,消息收到后消息清除,消息只能给一个人。
消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。消息被消费之后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
-
发布/订阅模式
一对多,消费者消费数据之后不会清除消息
消息生产者(发布)将消息发布到topic中,同时有豆哥消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有的订阅者消费
这个保留也是有一定的期限的,因为它只是一个消息队列,不是一个数据库系统。kafka是一个发布/订阅的模式。
可能会有这么一种情况,不想要的数据也会收到。
如果消息队列的速度和消费者的速度不太一样,可能会造成资源浪费。所以订阅/发布的这种模式,一般有两种实现方式
- 消费者主动去拉消息
- 消息队列主动推
kafka是消费者主动拉消息的模式。
kafka的缺点:
消费者自己需要轮询来查看消息队列中有没有数据。有时会比较浪费consumer的资源。
基础架构
Producer
消费生产者,想kafka broker发消息的客户端
Consumer
消息消费者,向kafka broker取消息的客户端
Consumer Group(CG)
消费者组,由一个或者多个consumer组成。消费者组内每个消费者负责消费不同分取的数据,一个分区只能由一个组内消费者消费,消费者组之间互不影响所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
Broker
一台kafka服务器就是一个broker,一个集群由多个broker组成。一个broker可以容纳多个topic
Topic
可以理解为一个队列,生产者和消费者面相的都是一个topic
Partion
为了实现拓展性,一个非常大的topic可以分布到多个broker(也就是服务器)上,一个topic可以分为多个partion,每个partion是一个有序的队列
Replica
副本,为了保证集群中某个节点发生故障时,该节点上的partion数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower
Leader
每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader
follower
每个分区多个副本的“从”,实时从leader中同步数据,保持和leader数据的同步,leader发生故障时,某个follower会成为新的leader
生产者生产消息
kafka集群管理消息(数据)
消费者消费消息
zookeeper注册消息
0.9之前消费者记录数据的位置是靠存放到zk里面的偏移量,0.9之后是存到了kafka本地的系统的topic中(kafka消息是存到磁盘中)
为什么这样做?
原来的方式给zk的要有点太大了,在高并发情况下边取数据边记录位置,会很麻烦
安装
docker 安装
下载镜像
sudo docker pull wurstmeister/zookeeper
sudo docker pull wurstmeister/kafka
启动zookeeper
sudo docker run -d --name zookeeper-container -p 2181 -t wurstmeister/zookeeper
启动kafka
sudo docker run -d --name kafka --publish 9092:9092 --link zookeeper-container --env KAFKA_ZOOKEEPER_CONNECT=zookeeper-container:2181 --env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 --env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka
进入到kafka
sudo docker exec -it kafka /bin/bash
进入到kafka默认目录
cd /opt/kafka
下载
➜ apps ls
kafka_2.11-0.11.0.0.tgz
➜ apps tar -zxvf kafka_2.11-0.11.0.0.tgz # 解压
➜ apps ls
kafka_2.11-0.11.0.0 kafka_2.11-0.11.0.0.tgz
➜ mv kafka_2.11-0.11.0.0 kafka # 改名
cd kafka/config
➜ config ll
total 128
-rw-r--r--@ 1 cjp staff 906B 6 23 2017 connect-console-sink.properties
-rw-r--r--@ 1 cjp staff 909B 6 23 2017 connect-console-source.properties
-rw-r--r--@ 1 cjp staff 5.7K 6 23 2017 connect-distributed.properties
-rw-r--r--@ 1 cjp staff 883B 6 23 2017 connect-file-sink.properties
-rw-r--r--@ 1 cjp staff 881B 6 23 2017 connect-file-source.properties
-rw-r--r--@ 1 cjp staff 1.1K 6 23 2017 connect-log4j.properties
-rw-r--r--@ 1 cjp staff 2.7K 6 23 2017 connect-standalone.properties
-rw-r--r--@ 1 cjp staff 1.2K 6 23 2017 consumer.properties
-rw-r--r--@ 1 cjp staff 4.6K 6 23 2017 log4j.properties
-rw-r--r--@ 1 cjp staff 1.9K 6 23 2017 producer.properties
-rw-r--r--@ 1 cjp staff 6.8K 6 23 2017 server.properties# 主要改这个
-rw-r--r--@ 1 cjp staff 1.0K 6 23 2017 tools-log4j.properties
-rw-r--r--@ 1 cjp staff 1.0K 6 23 2017 zookeeper.properties
配置文件
server.properties
# broker的全局唯一编号,不能重复
broker.id=0
# 删除topic功能使能
delete.topic.enable=true
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘io的线程数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.bffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600
# kafka里面暂存数据的目录,和日志的名字很像,但是不是日志
log.dirs=/tmp/kafka-logs
# topic在定钱broker上的分区个数
num.partitions=1
# 用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
# 数据保存的时间,过期将被清除
log.retention.hours=168
# log文件的大小
log.segment.bytes=1073741824
# 配置连接zookeeper集群地址
zookeeper.connect=localhost:2181
配置环境变量
$ vim ./etc/profile
# KAFKA_HOME
export KAFKA_HOME=/Users/cjp/apps/kafka
export PATH=$PATH:$KAFKA_HOME/bin
$ source /etc/profile
启动kafka
查看bin目录
➜ bin ll
# 这两个主要用在测试环境中,一般不怎么用
-rwxr-xr-x@ 1 cjp staff 945B 6 23 2017 kafka-console-consumer.sh # 控制台的消费者
-rwxr-xr-x@ 1 cjp staff 944B 6 23 2017 kafka-console-producer.sh # 控制台的生产者
-rwxr-xr-x@ 1 cjp staff 1.3K 6 23 2017 kafka-server-start.sh # 开始
-rwxr-xr-x@ 1 cjp staff 975B 6 23 2017 kafka-server-stop.sh· # 停止
-rwxr-xr-x@ 1 cjp staff 863B 6 23 2017 kafka-topics.sh # topic的增删改查
执行命令
bin/kafka-server-start.sh config/server.properties
但是这样直接启动会有一个问题就是kafka会一直占用终端,还有一种方法就是把kafka设置为守护进程(仅在hadoop环境中有效)
bin/kafka-server-start.sh -daemon config/server.properties
hadoop启动脚本
kafka命令操作
1)查看当前服务器中所有的topic
bin/kafka-topics.sh --zookeeper localhost:2181 --list
2)创建topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic first
选项说明
–topic 定义topic名
–replication-factor 定义副本数(副本数不能超过集群的数量)
–partitions 定义分区数
3)删除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic first
需要server.properties中方设置delete.topic.enable=true否则只是标记删除
4)发送消息
➜ kafka bin/kafka-console-producer.sh --broker-list localhost:9092 --topic first
>hello kafka, i am cjp, your master.
5)消费消息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic first
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic first
-
建议用–bootstrap-server代替–zookeeper
-
用控制台这边比较特殊,要用–from-beginning
6)查看分区详情
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic first
- 修改分区数
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic first --partitions 6