kafka实战

What ?

常用场景

  • 系统间的解耦合
  • 峰值压力缓冲
  • 异步(并行)通信
  • 常规的消息系统
    • 不能保证绝对可靠

其它同类产品

  • Rabit MQ
  • Redis
  • ZeroMQ
  • ActiveMQ

producer: 消息生产者

Producer将消息发布到之规定的topic中,同时producer也能决定将此消息归属于哪个partition

consumer:消费者

一个consumer属于一个consumer group 需要保存消费 消息的offset,对于offset的保存和使用,有消 费者consumer来控制 当consumer正常消费消 息时,offset将会"线性"的向前驱动,即消息将 依次顺序被消费

group

  • 每个consumer都有对应的group
  • 各个consumer消费不同的partition
  • 因此一个消息在group内只消费一次

broker

kafka 集群的server服务,负责处理消息的读写请求,存储消息

topic

  • 一个topic 可以认为是一类消息
  • 一个topic 分成多个partition
  • 每个partition内部消息强有序
  • 消息不经过内存缓冲
  • 根据时间策略 7天 删除

partitions

kafka基于文件存储.通过分区,可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存;可以将一个topic切分多任意多个partitions,来消息保存/消费的效率.此外越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力.

ISR

in-synco replicas 候选人机制 同步延后

Why ?

消息系统的特点

  • 高性能
  • 有序 FIFO 先进先出
  • 持久性
    • 持久化到磁盘且性能好
    • 采用零拷贝技术
    • 顺序读取
  • 分布式
    • 所有构件可以分布式
  • 很灵活
  • 高吞吐
    • 单节点支持上千个客户端,百兆/s

Where?

http://kafka.apache.org/

How?

wget http://mirror.bit.edu.cn/apache/kafka/0.10.1.0/kafka_2.10-0.10.1.0.tgz

/opt/soft/kafka_2.10-0.10.1.0.tgz

tar zxvf kafka_2.10-0.9.0.1.tgz cd /opt/soft/kafka_2.10-0.9.0.1 vi config/server.properties 修改zookeeper对应地址 zookeeper.connect=sj-node1:2181,sj-node3:2181,sj-node3:2181

scp -r kafka_2.10-0.9.0.1 root@sj-node2:/opt/soft/ scp -r kafka_2.10-0.9.0.1 root@sj-node3:/opt/soft/ scp -r kafka_2.10-0.9.0.1 root@sj-node3:/opt/soft/

分别修改 broker.id 从0开始 后面 分别是 1, 2, 3,4

修改完配置以后在每个节点启动kafka bin/kafka-server-start.sh config/server.properties

测试

  • 新建一个topic ./bin/kafka-topics.sh -zookeeper sj-node1:2181,sj-node2:2181,sj-node3:2181 -topic test -replication-factor 2 -partitions 5 --create
  • 查看当前的topic ./bin/kafka-topics.sh -zookeeper sj-node1:2181,sj-node2:2181,sj-node3:2181 -list
  • 在一个节点创建一个provider ./bin/kafka-console-producer.sh --broker-list sj-node1:9092,sj-node2:9092,sj-node3:9092 --topic test
  • 在另外一个节点创建一个consumer ./bin/kafka-console-consumer.sh --zookeeper sj-node1:2181,sj-node2:2181,sj-node3:2181 --from-beginning -topic test
  • 删除topic,不会立即生效 ./bin/kafka-topics.sh --delete --topic test --zookeeper sj-node1:2181,sj-node2:2181,sj-node3:2181
  • 查看明细 ./bin/kafka-topics.sh --zookeeper sj-node1:2181 --describe --topic testflume

问题

  • kafka offset 维护
    • zookeeper维护 0.9版本以前
    • kafka自己维护 0.9版本以后
  • kafka offset 数据存储机制
    • https://www.cnblogs.com/ITtangtang/p/8027217.html

彻底删除Kafka中的topic

  1. 删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录
  2. Kafka 删除topic的命令是: ./bin/kafka-topics.sh --delete --zookeeper 【zookeeper server】 --topic 【topic name】

如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion

  1. 此时你若想真正删除它,可以登录zookeeper客户端: 命令:./bin/zookeeper-client 找到topic所在的目录:ls /brokers/topics 找到要删除的topic,执行命令:rmr /brokers/topics/【topic name】即可,此时topic被彻底删除。

问题

  1. 数据丢失
  • procedure端 只有一个主机,接收数据保存在内存,如果死机,该段时间数据丢失 可以用个备份主机,发送方 ack机制 0:只发送不管leader接收是否成功,1:leader接收成功就下一条,-1:等所有备份机接收成功才下一条
  • consumer端 如果是低级api,更新偏移量之后,执行的代码错误,导致该条数据未正常处理,如果是高级api,处理时间大于自动提交时间,则至少有一条丢失 解决方式:关闭自动提交,改成手动提交
  1. 数据重复消费 0.8版本默认是自动60s提交一次偏移量,如果在自动提交之前出错,已经处理过的偏移量未更新,则下次执行的时候会把那60s内处理过的数据再消费一次 解决方式:关闭自动提交,改成手动提交
Juforg wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!