kafka

## kafka简单概念 #### 1、Kaka的设计时什么样的呢? ``` Kafka将消息以topic为单位进行归纳,将向Kafka topic发布消息的程序成为producers, 将预订topics并消费消息的程序成为consumer, Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker, producers通过网络将消息发送到Kafka集群,集群向消费者 提供消息. ``` #### 2、Kaka判断一个节点是否还活着有那两个条件? ``` (1)节点必须可以维护和ZooKeeper的连接,Zookeeper通过心跳机制检查每个节点的连接 (2)如果节点是个follower,他必须能及时的同步leader的写操作,延时不能太久 ``` #### 3、producer是否直接将数据发送到broker的leader(主节点)? ``` producer直接将数据发送到broker的leader(主节点),不需要在多个节点进行分发,为了帮助producer做到这点,所有的Kafka节点都可以及时的告知:哪些节点是活动的,目标topic目标分区的leader在哪。这样producer就可以直接将消息发送到目的地了. ``` #### 4、Kafka consumer是否可以消费指定分区消息? ``` Kafka consumer消费消息时,向broker发出"fetch"请求去消费特定分区的消息,consumer指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息,customer拥有了offset的控制权,可以向后回滚去重新消费之前的消息,这是很有意义的 ``` #### 5、Kafka高效文件存储设计特点: ``` (1).Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。 (2).通过索引信息可以快速定位message和确定response的最大大小。 (3).通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。 (4).通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。 ``` #### 6、Kafka 与传统消息系统之间有三个关键区别 ``` (1).Kafka持久化日志,这些日志可以被重复读取和无限期保留 (2).Kafka是一个分布式系统:它以集群的方式运行,可以灵活伸缩,在内部通过复制数据提升容错能力和高可用性 (3).Kafka支持实时的流式处理 ``` #### 7、Kaka创建Topic时如何将分区放置到不同的Broker中 ``` 副本因子不能大于Broker 的个数; 第一个分区(编号为0)的第一个副本放置位置是随机从 brokerList 选择的; 其他分区的第一个副本放置位置相对于第0个分区依次往后 也就是如果我们有5个Broker,5个分区,假设第一个分区放在第四个Broker上,那么第二个分区将会放在第五个Broker上;第三个分区将会放在第一个Broker上;第四个分区将会放在第二个Broker上,依次类推;剩余的副本相对于第一个副本放置位置其实是由nextReplicaShift 决定的,而这个数也是随机产生. ``` #### 8、Kafka中的消息是否会丢失和重复消费? ``` 要确定Kafka的消息是否丢失或重复,从两个方面分析入手:消息发送和消息消费。 1、消息发送 Kafka消息发送有两种方式:同步(sync)和异步(async),默认是同步方式,可通过producer.type属性进行 配置。Kafka通过配置request.required.acks属性来确认消息的生产: 0---表示不进行消息接收是否成功的确认; 1---表示当Leader接收成功时确认; -1---表示Leader和Follower都接收成功时确认; (1)acks=0,不和Kafka集群进行消息接收确认,则当网络异常、缓冲区满了等情况时,消息可能丢失; (2)acks=1、同步模式下,只有Leader确认接收成功后但挂掉了,副本没有同步,数据可能丢失; 2、消息消费 Kafka消息消费有两个consumer接口,Low-level API和High-level API: Kafka消息发送有两种方式:同步和异步,默认是同步方式,可通过producer.type属性进行配置。 Kafka通过配置request.required.acks属性来确认消息的生产: Low-level API:消费者自己维护offset等值,可以实现对Kafka的完全控制; High-level API:封装了对parition和offset的管理,使用简单; 如果使用高级接口High-level API,可能存在一个问题就是当消息消费者从集群中把消息取出来、并提交 了新的消息offset值后,还没来得及消费就挂掉了,那么下次再消费时之前没消费成功的消息就“诡异”的消失了; 解决办法: 针对消息丢失:同步模式下,确认机制设置为-1,即让消息写入Leader和Follower之后再确认消息发送成功; 异步模式下,为防止缓冲区满,可以在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态; 针对消息重复:将消息的唯一标识保存到外部介质中,每次消费时判断是否处理过即可 ``` #### 9、为什么Kafka不支持读写分离? ``` (1)数据一致性问题。 (2)延时问题。对延时敏感的应用而言,主写从读的功能并不太适用。 ``` #### 10、kafka中的broker 是干什么的 ``` broker 是消息的代理,Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉 取指定Topic的消息,然后进行业务处理,broker在中间起到一个代理保存消息的中转站。 ``` #### 11、kafka中的 zookeeper 起到什么作用,可以不用zookeeper么 ``` zookeeper 是一个分布式的协调组件,broker依然依赖于ZooKeeper,zookeeper 在kafka中还用来选举controller 和 检测broker是否存活等等 ``` ## kafka基础操作 #### 创建topic ```shell # bin/kafka-topics.sh --create --bootstrap-server 10.1.125.60:9092 --replication-factor 1 --partitions 3 --topic weblog # 老版本kafka需要指定zookeeper # bin/kafka-topics.sh --create --zookeeper 10.1.125.60:2181 --replication-factor 1 --partitions 3 --topic weblog ``` #### 修改topic副本 ```shell # ./kafka-topics.sh --alter --partitions 4 --topic weblog --bootstrap-server 10.1.125.60:9092 ``` #### 查看kafka的所有topic ```shell # bin/kafka-topics.sh --zookeeper 10.1.125.60:2181 --list ``` #### 查看单个topic的详细信息 ```shell # bin/kafka-topics.sh --zookeeper 10.1.125.60:2181 --topic weblog --describe Topic:weblog PartitionCount:3 ReplicationFactor:1 Configs: Topic: weblog Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: weblog Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: weblog Partition: 2 Leader: 0 Replicas: 0 Isr: 0 ``` #### 查询topic里内容: ```shell # bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic weblog --from-beginning ``` #### 查看消费者consumer group列表 ```shell # bin/kafka-consumer-groups.sh --bootstrap-server 10.1.125.60:9092 --list consumer-test # 删除消费组 # kafka-consumer-groups --bootstrap-server kafka:9092 --delete --group consumer-test ``` #### 查看消费者consumer group详情 ```shell # bin/kafka-consumer-groups.sh --bootstrap-server 10.1.125.60:9092 --group consumer-test --describe GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID consumer-test weblog 0 - 173127 - sarama-78301b04-84e5-4d9d-a487-df463b6e426e /172.16.1.108 sarama consumer-test weblog 1 - 172995 - sarama-78301b04-84e5-4d9d-a487-df463b6e426e /172.16.1.108 sarama consumer-test weblog 2 - 171922 - sarama-78301b04-84e5-4d9d-a487-df463b6e426e /172.16.1.108 sarama ``` ## kafka日志保留时间设置无效问题 看了网上很多文档,说是要设置log.retention.hour等等参数。 默认是保留7天,但我实测下来发现日志根本没有任何变化。 目前我们的[kafka](https://so.csdn.net/so/search?q=kafka&spm=1001.2101.3001.7020),一天就有400多个G,保留七天大大增加了我们的服务器成本。 不生效的设置 ``` #设置日志只保留一个小时的 "log.retention.hours": 1 #设置大于500M就自动删除 "log.retention.bytes": "536870912" #设置日志被标记删除后,保留多久彻底删除,1分钟 "log.segment.delete.delay.ms": 60000 #设置多久扫描一次要清理的日志 "log.cleanup.interval.mins": 3 ``` ### kafka回收原理 kafka只会回收上个分片的数据 配置没有生效的原因就是,数据并没有分片,所以没有回收 ### kafka什么时候分片? 有个参数log.roll.hours log.roll.hours 设置多久滚动一次,滚动也就是之前的数据就会分片分出去 segment.bytes 设置日志文件到了多大就会自动分片 总结 ``` "log.retention.hours": 1 "log.cleanup.policy": "delete" "log.retention.bytes": "536870912" "log.segment.delete.delay.ms": 60000 "log.cleanup.interval.mins": 3 "log.roll.hours": 1 "segment.bytes": "536870913" "log.retention.check.interval.ms": 120000 "retention.ms": "3600000" "retention.bytes": "536870912" ``` 建议 log.roll.hours retention.ms log.retention.hours 设置的时间相同