您的位置 首页 golang

kafka全面认知

什么是Kafka

Kafka是⼀款分布式消息发布和订阅系统,它的特点是⾼性能、⾼吞吐量。

最早设计的⽬的是作为LinkedIn的活动流和运营数据的处理管道。这些数据主要是⽤来对⽤户做⽤户画像分析以及服务器性能数据的⼀些监控。

所以kafka⼀开始设计的⽬标就是作为⼀个分布式、⾼吞吐量的消息系统,所以适合运⽤在⼤数据传输场景

Kafka的应⽤场景

由于kafka具有更好的吞吐量、内置分区、冗余及容错性的优点(kafka每秒可以处理⼏⼗万消息),让kafka成为了⼀个很好的⼤规模消息处理应⽤的解决⽅案。所以在企业级应⽤⻓,主要会应⽤于如下⼏个⽅⾯:

⾏为跟踪 :kafka可以⽤于跟踪⽤户浏览⻚⾯、搜索及其他⾏为。通过发布-订阅模式实时记录到对应的topic中,通过后端⼤数据平台接⼊处理分析,并做更进⼀步的实时处理和监控

⽇志收集 :⽇志收集⽅⾯,有很多⽐较优秀的产品,⽐如Apache Flume,很多公司使⽤kafka代理⽇志聚合。⽇志聚合表示从服务器上收集⽇志⽂件,然后放到⼀个集中的平台(⽂件服务器)进⾏处理。在实际应⽤开发中,我们应⽤程序的log都会输出到本地的磁盘上,排查问题的话通过linux命令来搞定,如果应⽤程序组成了负载均衡集群,并且集群的机器有⼏⼗台以上,那么想通过⽇志快速定位到问题,就是很麻烦的事情了。所以⼀般都会做⼀个⽇志统⼀收集平台管理log⽇志⽤来快速查询重要应⽤的问题。所以很多公司的套路都是把应⽤⽇志集中到kafka上,然后分别导⼊到es和hdfs上,⽤来做实时检索分析和离线统计数据备份等。⽽另⼀⽅⾯,kafka本身⼜提供了很好的api来集成⽇志并且做⽇志收集。

Kafka的架构

⼀个典型的kafka集群包含若⼲Producer(可以是应⽤节点产⽣的消息,也可以是通过Flume收集⽇志产⽣的事件),若⼲个Broker(kafka⽀持⽔平扩展)、若⼲个Consumer Group,以及⼀个zookeeper集群。kafka通过zookeeper管理集群配置及服务协同。Producer使⽤push模式将消息发布到broker,consumer通过监听使⽤pull模式从broker订阅并消费消息。

多个broker协同⼯作,producer和consumer部署在各个业务逻辑中。三者通过zookeeper管理协调请求和转发。这样就组成了⼀个⾼性能的分布式消息发布和订阅系统。

图上有⼀个细节是和其他mq中间件不同的点,producer 发送消息到broker的过程是push,⽽consumer从broker消费消息的过程是pull,主动去拉数据。⽽不是broker把数据主动发送给consumer。

名词解释

1)Broker

Kafka集群包含⼀个或多个服务器,这种服务器被称为broker。broker端不维护数据的消费状态,提升了性能。直接使⽤磁盘进⾏存储,线性读写,速度快:避免了数据在JVM内存和系统内存之间的复制,减少耗性能的创建对象和垃圾回收。

2)Producer

负责发布消息到Kafka broker

3)Consumer

消息消费者,向Kafka broker读取消息的客户端,consumer从broker拉取(pull)数据并进⾏处理。

4)Topic

每条发布到Kafka集群的消息都有⼀个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上⼀个Topic的消息虽然保存于⼀个或多个broker上但⽤户只需指定消息的Topic即可⽣产或消费数据⽽不必关⼼数据存于何处)

5)Partition

Parition是物理上的概念,每个Topic包含⼀个或多个Partition.

6)Consumer Group

每个Consumer属于⼀个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)

7)Topic & Partition

Topic在逻辑上可以被认为是⼀个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue⾥。为了使得Kafka的吞吐率可以线性提⾼,物理上把Topic分成⼀个或多个Partition,每个Partition在物理上对应⼀个⽂件夹,该⽂件夹下存储这个Partition的所有消息和索引⽂件。若创建topic1和topic2两个topic,且分别有13个和19个分区,则整个集群上会相应会⽣成共32个⽂件夹(本⽂所⽤集群共8个节点,此处topic1和topic2 replication-factor均为1)。

Java中使⽤kafka进⾏通信

依赖

发送端代码:

 <dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka-clients</artifactId>
     <version>2.0.0</version>
</dependency>  

发送端代码

 public class Producer extends Thread{
      private final KafkaProducer<Integer,String> producer;
      private final String topic;

      public Producer(String topic) {
          Properties properties=new Properties();
          properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.13.102:9092,192.168.13.103
          properties.put(ProducerConfig.CLIENT_ID_CONFIG,"practice-producer");
          properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          IntegerSerializer.class.getName());
          properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          StringSerializer.class.getName());
          producer=new KafkaProducer<Integer, String>(properties);
          this.topic = topic;
      }

      @Override
      public void run() {
          int num=0;
          while(num<50){
              String msg="pratice test message:"+num;
          try {
              producer.send(new ProducerRecord<Integer, String>
              (topic,msg)).get();
              TimeUnit.SECONDS.sleep(2);
              num++;
          } catch (InterruptedException e) {
              e.printStackTrace();
          } catch (ExecutionException e) {
             e.printStackTrace();
          }
         }
      }

      public static void main(String[] args) {
          new Producer("test").start();
      }
}  

消费端代码

 public class Consumer extends Thread{
      private final KafkaConsumer<Integer,String> consumer;
      private final String topic;

      public Consumer(String topic){
          Properties properties=new Properties();
          properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.13.102:9092,192
          .168.13.103:9092,192.168.13.104:9092");
          properties.put(ConsumerConfig.GROUP_ID_CONFIG, "practice-consumer");
          properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
          //设置offset⾃动提交
          properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
          //⾃动提交间隔时间
          properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
          properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.IntegerDeserializer");
          properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringDeserializer");
          properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
          //对于当前groupid来说,消息的offset从最早的消息开始消费
          consumer= new KafkaConsumer<>(properties);
          this.topic=topic;
       }

      @Override
      public void run() {
          while(true) {
              consumer.subscribe(Collections.singleton(this.topic));
              ConsumerRecords<Integer, String> records =
              consumer.poll(Duration.ofSeconds(1));
              records.forEach(record -> {
              System.out.println(record.key() + " " + record.value() + " ->
              offset:" + record.offset());
              });
          }
      }
      
      public static void main(String[] args) {
          new Consumer("test").start();
      }
}  

异步发送

kafka对于消息的发送,可以⽀持同步和异步,前⾯演示的案例中,我们是基于同步发送消息。同步会需要阻塞,⽽异步不需要等待阻塞的过程。

从本质上来说,kafka都是采⽤异步的⽅式来发送消息到broker,但是kafka并不是每次发送消息都会直接发送到broker上,⽽是把消息放到了⼀个发送队列中,然后通过⼀个后台线程不断从队列取出消息进⾏发送,发送成功后会触发callback。kafka客户端会积累⼀定量的消息统⼀组装成⼀个批量消息发送出去,触发条件是前⾯提到的batch.size和linger.ms。

⽽同步发送的⽅法,⽆⾮就是通过future.get()来等待消息的发送返回结果,但是这种⽅法会严重影响消息发送的性能。

 public void run() {
    int num=0;
    while(num<50){
        String msg="pratice test message:"+num;
        try {
            producer.send(new ProducerRecord<>(topic, msg), new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                  System.out.println("callback: "+recordMetadata.offset()+"->"+recordMetadata.partition());
            }});
            TimeUnit.SECONDS.sleep(2);
            num++;
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
     }
}  

batch.size

⽣产者发送多个消息到broker上的同⼀个分区时,为了减少⽹络请求带来的性能开销,通过批量的⽅式来提交消息,可以通过这个参数来控制批量提交的字节数⼤⼩,默认⼤⼩是16384byte,也就是16kb,意味着当⼀批消息⼤⼩达到指定的batch.size的时候会统⼀发送。

linger.ms

Producer默认会把两次发送时间间隔内收集到的所有Requests进⾏⼀次聚合然后再发送,以此提⾼吞吐量,⽽linger.ms就是为每次发送到broker的请求增加⼀些delay,以此来聚合更多的Message请求。这个有点想TCP⾥⾯的Nagle算法,在TCP协议的传输中,为了减少⼤量⼩数据包的发送,采⽤了Nagle算法,也就是基于⼩包的等-停协议。

batch.size和linger.ms这两个参数是kafka性能优化的关键参数,batch.size和linger.ms这两者的作⽤是⼀样的,如果两个都配置了,那么怎么⼯作的呢?实际上,当⼆者都配置的时候,只要满⾜其中⼀个要求,就会发送请求到broker上。

⼀些基础配置分析

group.id

consumer group是kafka提供的可扩展且具有容错性的消费者机制。既然是⼀个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享⼀个公共的ID,即group ID。组内的所有消费者协调在⼀起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同⼀个消费组内的⼀个consumer来消费.如下图所示,分别有三个消费者,属于两个不同的group,那么对于firstTopic这个topic来说,这两个组的消费者都能同时消费这个topic中的消息,对于此时的架构来说,这个firstTopic就类似于ActiveMQ中的topic概念。如右图所示,如果3个消费者都属于同⼀个group,那么此时firstTopic就是⼀个Queue的概念。

enable.auto.commit

消费者 消费位移 的提交⽅式, true 为⾃动提交,即consumer poll消息后⾃动提交上次之前poll的所有消息位移,若为 false 则需要⼿动提交,即consumer poll出的消息需要⼿动提交消息位移,提交消息位移的⽅式有同步提交和异步提交。

auto.commit.interval.ms

在enable.auto.commit 为true的情况下, ⾃动提交消费位移的间隔,默认值5000ms。那么消费者会在poll⽅法调⽤后每隔5000ms(由auto.commit.interval.ms指定)提交⼀次位移。和很多其 他操作⼀样, ⾃动提交消费位移也是由poll()⽅法来驱动的 ;在调⽤poll()时,消费者判断是否到达提交时间(auto.commit.interval.ms指定的值),如果是则提交 上⼀次poll返回的最⼤位移。 具体什么时候提交消息位移,请看这篇[⽂章]。(

auto.offset.reset

这个参数是针对新的groupid中的消费者⽽⾔的,当有新groupid的消费者来消费指定的topic时,对于该参数的配置,会有不同的语义。

auto.offset.reset=latest情况下,新的消费者将会从其他消费者最后消费的offset处开始消费Topic下的消息。

auto.offset.reset= earliest情况下,新的消费者会从该topic最早的消息开始消费。

auto.offset.reset=none情况下,新的消费者加⼊以后,由于之前不存在offset,则会直接抛出异常。

max.poll.records

consumer是通过 轮训的⽅式使⽤poll()⽅法不断获取消息的 ,max.poll.records参数可以限制每次调⽤poll返回的消息数,默认是500条。

max.poll.interval.ms

默认值5分钟,表示若5分钟之内consumer没有消费完上⼀次poll的消息, 也就是在5分钟之内没有调⽤下次的poll()函数 ,那么kafka会认为consumer已经宕机,所以会将该consumer踢出consumer group, 紧接着就会发⽣rebalance,发⽣rebalance可能会发⽣重复消费的情况。

正常消费端伪代码如下

 while (true) {
    //取出消息
    ConsumerRecords<String,String> records = consumer.poll(100);
    for (ConsumerRecord<String,String> record : records) {
    //执⾏消费消息
    dosomething
    }
}  

看到这⾥需要保证poll出的所有消息消费时间总和不能⼤于 max.poll.interval.ms ,如果⼤于则会将consumer踢出consumer group,会进⾏rebalance操作了,所有 每次poll消息的数量不能太⼤,避免发⽣rebalance。

关于Topic和Partition

Topic

在kafka中,topic是⼀个存储消息的逻辑概念,可以认为是⼀个消息集合。每条消息发送到kafka集群的消息都有⼀个类别。物理上来说,不同的topic的消息是分开存储的,每个topic可以有多个⽣产者向它发送消息,也可以有多个消费者去消费其中的消息。

Partition(分区)

每个topic可以划分多个分区(每个Topic⾄少有⼀个分区),同⼀topic下的不同分区包含的消息是不同的,那么为什么要设置多partition呢?第⼀分区存储可以存储更多的消息,其次是为了提⾼吞吐量,如果只有⼀个partition,则所有消息只能存储在该partition内,消费时不管有多少个消费者也只能顺序读取该partition内的消息,如果是多个partition,那么消费者就可以同时从多个partition内并发读取消息,正是这个原因才提⾼了吞吐量。

每个消息在被添加到分区时,都会被分配⼀个offset(称之为偏移量),它是消息在此分区中的唯⼀编号,kafka通过offset保证消息在分区内的顺序,offset的顺序不跨分区,即kafka只保证在同⼀个分区内的消息是有序的。在多partition和多consumer的情况下,⽣产的消息是具有顺序性的,且根据partition的分发策略依次插⼊到相应的partition中,但是由于kafak只保证同⼀个partition内的消息输出有序性,所以多partition依次输出的消息顺序并不能保证和⽣产消息写⼊的顺序是⼀样的。

下图中,对于名字为test的topic,做了3个分区,分别是p0、p1、p2.

每⼀条消息发送到broker时,会根据partition的规则选择存储到哪⼀个partition。如果partition规则设置合理,那么所有的消息会均匀的分布在不同的partition中,这样就有点类似数据库的分库分表的概念,把数据做了分⽚处理。

Topic&Partition的存储

Partition是以⽂件的形式存储在⽂件系统中,⽐如创建⼀个名为firstTopic的topic,其中有3个partition,那么在kafka的数据⽬录(/tmp/kafka-log)中就有3个⽬录,firstTopic-0~3, 命名规则是<topic_name>-<partition_id>

 sh kafka-topics.sh --create --zookeeper 192.168.11.156:2181 --replication-factor 1 --partiti  

关于消息分发

kafka消息分发策略

消息是kafka中最基本的数据单元,在kafka中,⼀条消息由key、value两部分构成,在发送⼀条消息时,我们可以指定这个key,那么producer会根据key和partition机制来判断当前这条消息应该发送并存储到哪个partition中。我们可以根据需要进⾏扩展producer的partition机制。

⾃定义Partitioner

 public class MyPartitioner implements Partitioner {
      private Random random = new Random();
     
      @Override
      public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Clust
          //获取集群中指定topic的所有分区信息
          List<PartitionInfo> partitionInfos=cluster.partitionsForTopic(s);
          int numOfPartition=partitionInfos.size();
          int partitionNum=0;
          if(o==null){
               //key没有设置
               partitionNum=random.nextint(numOfPartition);
               //随机指定分区
          } else{
               partitionNum=Math.abs((o1.hashCode()))%numOfPartition;
          }
          System.out.println("key->"+o+",value->"+o1+"->send to partition:"+partitionNum)
          return partitionNum;
      }
}  

发送端代码添加⾃定义分区

 public KafkaProducerDemo(String topic,Boolean isAysnc){
      Properties properties=new Properties();
      properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
      "192.168.13.102:9092,192.168.13.103:9092,192.168.13.104:9092");
      properties.put(ProducerConfig.CLIENT_ID_CONFIG,"KafkaProducerDemo");
      properties.put(ProducerConfig.ACKS_CONFIG,"-1");
      properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.IntegerSerializer");
      properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer");
      properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.wei.kafka.MyPartitioner");
      producer=new KafkaProducer<Integer, String>(properties);
      this.topic = topic;
      this.isAysnc = isAysnc;   
}  

消息默认的分发机制

默认情况下,kafka采⽤的是hash取模的分区算法。如果Key为null,则会随机分配⼀个分区。这个随机是在这个参数”metadata.max.age.ms”的时间范围内随机选择⼀个。对于这个时间段内,如果key为null,则只会发送到唯⼀的分区。这个值值哦默认情况下是10分钟更新⼀次。关于Metadata,这个之前没讲过,简单理解就是Topic/Partition和broker的映射关系,每⼀个topic的每⼀个partition,需要知道对应的broker列表是什么,leader是谁、follower是谁。这些信息都是存储在Metadata这个类⾥⾯。

消费端如何消费指定的分区

通过下⾯的代码,就可以消费指定该topic下的0号分区。其他分区的数据就⽆法接收

 //消费指定分区的时候,不需要再订阅
//kafkaConsumer.subscribe(Collections.singletonList(topic));
//消费指定的分区
TopicPartition topicPartition=new TopicPartition(topic,0);
kafkaConsumer.assign(Arrays.asList(topicPartition));  

消息的消费原理

在实际⽣产过程中,每个topic都会有多个partitions,多个partitions的好处在于,⼀⽅⾯能够对broker上的数据进⾏分⽚有效减少了消息的容量从⽽提升io性能。另外⼀⽅⾯,提⾼了消费端的消费能⼒,如果只有⼀个partitions,那么多consumer也只能顺序读取该partitions内的消息,如果是多个partitions的话,那么多consumer就可以从多partitions并发⽣读取topic消息,这样就提⾼了消息断的消费能⼒,所以⼀般会 设置多个consumer去消费同⼀个topic的多个partitions , 也就是消费端的负载均衡机制。

这也就是我们接下来要了解的,在多个partition以及多个consumer的情况下,消费者是如何消费消息的。

kafka存在consumer group的概念,也就是group.id⼀样的consumer,这些consumer属于⼀个consumer group,组内的所有消费者协调在⼀起来消费订阅主题的所有分区。 当然每⼀个分区只能由同⼀个消费组内的consumer来消费 ,那么同⼀个consumergroup⾥⾯的consumer是怎么去分配该消费哪个分区⾥的数据呢?如下图所示,3个分区,3个消费者,那么哪个消费者消分哪个分区?

对于上⾯这个图来说,这3个消费者会分别消费test这个topic 的3个分区,也就是每个consumer消费⼀个partition。

演示1(3个partiton对应3个consumer)

Ø 创建⼀个带3个分区的topic

Ø 启动3个消费者消费同⼀个topic,并且这3个consumer属于同⼀个组

Ø 启动发送者进⾏消息发送

演示结果:consumer1会消费partition0分区、consumer2会消费partition1分区、consumer3会消费partition2分区如果是2个consumer消费3个partition呢?会是怎么样的结果?

演示2(3个partiton对应2个consumer)

Ø 基于上⾯演示的案例的topic不变

Ø 启动2个消费这消费该topic

Ø 启动发送者进⾏消息发送

演示结果:consumer1会消费partition0/partition1分区、consumer2会消费partition2分区

演示3(3个partition对应4个或以上consumer)

演示结果:仍然只有3个consumer对应3个partition,其他的consumer⽆法消费消息

通过这个演示的过程,引出接下来需要了解的kafka的分区分配策略(Partition Assignment

Strategy)

consumer和partition的数量建议

1. 如果consumer⽐partition多,是浪费, 因为kafka的设计是在⼀个partition上是不允许并发的,所以consumer数不要⼤于partition数

2. 如果consumer⽐partition少,⼀个consumer会对应于多个partitions,这⾥主要合理分配consumer数和partition数,否则会导致partition⾥⾯的数据被取的不均匀, 被取的不均匀也就代表是消费能⼒不均匀 。 最好partiton数⽬是consumer数⽬的整数倍 ,所以partition数⽬很重要,⽐如取24,就很容易设定consumer数⽬

3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在⼀个partition上数据是有序的,但多个partition,根据你读的顺序会有不同

4. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发⽣变化

什么是分区分配策略

通过前⾯的案例演示,我们应该能猜到,同⼀个group中的消费者对于⼀个topic中的多个partition,存在⼀定的分区分配策略,每个消费者都可以设置⾃⼰的分区分配策略,对于消费组⽽⾔,会从各个消费者上报过来的分区分配策略中选举⼀个彼此都赞同的策略来实现整体的分区分配,这个”赞同”的规则请继续往下看。

在kafka中,存在三种分区分配策略,⼀种是Range(默认)、 另⼀种是RoundRobin(轮询)、StickyAssignor(粘性)。 在消费端中的ConsumerConfig中,通过这个属性来指定分区分配策略

 public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.stra  

RangeAssignor(范围分区)

Range策略是对每个主题⽽⾔的,⾸先对同⼀个主题⾥⾯的分区按照序号进⾏排序,并对消费者按照字⺟顺序进⾏排序。

假设n = 分区数/消费者数量 m= 分区数%消费者数量 , 那么前m个消费者每个分配n+1个分区,后⾯的(消费者数量-m)个消费者每个分配n个分区

假设我们有10个分区,3个消费者,排完序的分区将会是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者线程排完序将会是C1-0, C2-0, C3-0。然后将partitions的个数除于消费者线程的总数来决定每个消费者线程消费⼏个分区。如果除不尽,那么前⾯⼏个消费者线程将会多消费⼀个分区。在我们的例⼦⾥⾯,我们有10个分区,3个消费者线程, 10 / 3 = 3,⽽且除不尽,那么消费者线程 C1-0 将会多消费⼀个分区.

结果看起来是这样的:

C1-0 将消费 0, 1, 2, 3 分区

C2-0 将消费 4, 5, 6 分区

C3-0 将消费 7, 8, 9 分区

假如我们有11个分区,那么最后分区分配的结果看起来是这样的:

C1-0 将消费 0, 1, 2, 3 分区

C2-0 将消费 4, 5, 6, 7 分区

C3-0 将消费 8, 9, 10 分区

假如我们有2个主题(T1和T2),分别有10个分区,那么最后分区分配的结果看起来是这样的:

C1-0 将消费 T1主题的 0, 1, 2, 3 分区以及 T2主题的 0, 1, 2, 3分区

C2-0 将消费 T1主题的 4, 5, 6 分区以及 T2主题的 4, 5, 6分区

C3-0 将消费 T1主题的 7, 8, 9 分区以及 T2主题的 7, 8, 9分区

可以看出,C1-0 消费者线程⽐其他消费者线程多消费了2个分区,这种分配⽅式明显的⼀个问题是随着消费者订阅的Topic的数量的增加,不均衡的问题会越来越严重, 也就代表着C1-0这个消费者的消费能⼒会低于C2-0和C3-0消费者,导致的问题直接点说就是消费者的消费能⼒不平衡 ,所以最好的情况就是 partiton数⽬是consumer数⽬的整数倍,可以有效避免这个弊端。

RoundRobinAssignor(轮询分区)

轮询分区策略是把所有partition和所有consumer线程都列出来,然后按照hashcode进⾏排序,注意上⼀种range分区是针对每⼀个topic⽽⾔的,⽽轮训分区是相对于所有的partition和consumer⽽⾔的,最后通过轮询算法分配partition给消费线程。如果消费组内,所有消费者订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果是尽量均衡的(消费者之间分配到的分区数的差值不会超过1)。如果订阅的Topic列表是不同的,那么分配结果是不保证“尽量均衡”的,因为某些消费者不参与⼀些Topic的分配。

在我们的例⼦⾥⾯,假如按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-8,

T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为C1-0, C1-1, C2-0, C2-1(c1和c2consumer group都订阅了t1),最后分区分配的结果为:

C1-0 将消费 T1-5, T1-2, T1-6 分区;

C1-1 将消费 T1-3, T1-1, T1-9 分区;

C2-0 将消费 T1-0, T1-4 分区;

C2-1 将消费 T1-8, T1-7 分区;

相对于RangeAssignor,在订阅多个Topic的情况下,RoundRobinAssignor的⽅式能消费者之间尽量均衡的分配到分区(分配到的分区数的差值不会超过1——RangeAssignor的分配策略可能随着订阅的Topic越来越多,差值越来越⼤)

对于订阅组内消费者订阅Topic不⼀致的情况:假设有三个消费者分别为C1-0、C2-0、C3-0,有3个Topic T1、T2、T3,分别拥有1、2、3个分区,并且C1-0订阅T1,C2-0订阅T1和T2,C3-0订阅T1、T2、T3,那么RoundRobinAssignor的分配结果如下:

看上去分配已经尽量的保证均衡了,不过可以发现C3-0承担了4个分区的消费⽽C2-0和C1-0都是承担⼀个分区,如果T2-1分配给c2-0,均衡性是不是更好呢?带个这个问题,继续下⾯的这次策略。

StrickyAssignor 分配策略

背景

尽管RoundRobinAssignor已经在RangeAssignor上做了⼀些优化来更均衡的分配分区,但是在⼀些情况下依旧会产⽣严重的分配偏差,⽐如消费组中订阅的Topic列表不相同的情况下。 更核⼼的问题是⽆论是RangeAssignor,还是RoundRobinAssignor,当前的分区分配算法都没有考虑上⼀次的分配结果 。显然,在执⾏⼀次新的分配之前,如果能考虑到上⼀次分配的结果,尽量少的调整分区分配的变动,显然是能节省很多开销的。

kafka在0.11.x版本⽀持了StrickyAssignor, 翻译过来叫 粘性策略 ,可以理解为分配结果是带“粘性的”

——每⼀次分配变更相对上⼀次分配做最少的变动(上⼀次的结果是有粘性的),它主要有两个⽬的:

  1. 分区的分配尽可能的均匀
  2. 分区的分配尽可能和上次分配保持相同,也就是 rebalance 之后分区的分配尽量和之前的分区分配相同

当两者发⽣冲突时, 第 ⼀ 个⽬标优先于第⼆个⽬标。 第⼀个⽬标是每个分配算法都尽量尝试去完成的,⽽第⼆个⽬标才真正体现出StickyAssignor特性的。

我们举俩个例⼦来体现StickyAssignor特性

第⼀个例⼦:所有consumer订阅的topic都相同的情况:

  • 有3个Consumer:C0、C1、C2
  • 有4个Topic:T0、T1、T2、T3,每个Topic有2个分区
  • 所有Consumer都订阅了这4个分区

StickyAssignor的分配结果如下图所示(增加RoundRobinAssignor分配作为对⽐):

上⾯的例⼦中,删除C1 consumerre然后balance,RoundRobin策略会将所有分区重新进⾏⼀遍分配,可以看到变动较⼤,⽽Sticky模式原来分配给C0、C2的分区都没有发⽣变动,且最终C0、C1达到的均衡的⽬的,这就体现了 StickyAssignor策略的优越性

再举⼀个例⼦:所有consumer订阅的topic不相同的情况:

  • 有3个Consumer:C0、C1、C2
  • 3个Topic:T0、T1、T2,它们分别有1、2、3个分区
  • C0订阅T0;C1订阅T0、T1;C2订阅T0、T1、T2

分配结果如下图所示:

⾸先在所有consumer订阅的topic不相同的情况下,可以看出StickyAssignor策略相⽐于

RoundRobin策略均衡性更好,体现了StickyAssignor策略的第⼀个特点: 分区的分配尽可能的均匀 ,看到这⾥也解决了我们上节留下的疑问。

其次是,在删除C0消费者进⾏rebalance之后,可以看出使⽤RoundRobin策略的分区会重新进⾏⼀遍RoundRobin,⽽使⽤StickyAssignor策略的分区分配尽可能的和上次保持了最⼩变动。

以上俩个例⼦,完美体现了StickyAssignor策略的优越性。

rebalance触发的场景

在上⾯的例⼦中可以看到rebalance触发的场景⼤致有如下三种情况:

(1)Consumer增加或删除会触发 Consumer Group的Rebalance

(2)Broker的增加或者减少都会触发 Consumer Rebalance

(3)consumer在超过max.poll.interval.ms时间后没有再次poll的操作,kafka会认为该consumer宕机,也就会将该consumer踢出group,触发rebalance

谁来执⾏Rebalance以及管理consumer的group呢?

Kafka提供了⼀个⻆⾊:coordinator来执⾏对于consumer group的管理,当consumer group的第⼀个consumer启动的时候,它会去和kafka server确定谁是它们组的coordinator。之后该group内的所有成员都会和该coordinator进⾏协调通信

如何确定coordinator

consumer group如何确定⾃⼰的coordinator是谁呢, 消费者向kafka集群中的任意⼀个broker发送⼀个GroupCoordinatorRequest请求,服务端会返回⼀个负载最⼩的broker节点的id,并将该broker设置为coordinator

JoinGroup的过程

在rebalance之前,需要保证coordinator是已经确定好了的,整个rebalance的过程分为两个步骤, Join 和 Sync

join: 表示加⼊到consumer group中,在这⼀步中,所有的成员都会向coordinator发送joinGroup的请求。⼀旦所有成员都发送了joinGroup请求,那么coordinator会选择⼀个consumer担任leader⻆⾊,并把组成员信息和订阅信息发送消费者

leader选举算法⽐较简单,如果消费组内没有leader,那么第⼀个加⼊消费组的消费者就是消费者leader,如果这个时候leader消费者退出了消费组,那么重新选举⼀个leader,这个选举很随意,类似于随机算法

protocol_metadata: 序列化后的消费者的订阅信息

leader_id: 消费组中的消费者,coordinator会选择⼀个座位leader,对应的就是member_id

member_metadata 对应消费者的订阅信息

members:consumer group中全部的消费者的订阅信息

generation_id: 年代信息,类似于之前讲解zookeeper的时候的epoch是⼀样的,对于每⼀轮

rebalance,generation_id都会递增。主要⽤来保护consumer group。隔离⽆效的offset提交。也就

是上⼀轮的consumer成员⽆法提交offset到新的consumer group中

每个消费者都可以设置⾃⼰的分区分配策略,对于消费组⽽⾔,会从各个消费者上报过来的分区分配策略中选举⼀个彼此都赞同的策略来实现整体的分区分配,这个”赞同”的规则是,消费组内的各个消费者会通过投票来决定

  • 在joingroup阶段,每个consumer都会把⾃⼰⽀持的分区分配策略发送到coordinator
  • coordinator⼿机到所有消费者的分配策略,组成⼀个候选集
  • 每个消费者需要从候选集⾥找出⼀个⾃⼰⽀持的策略,并且为这个策略投票
  • 最终计算候选集中各个策略的选票数,票数最多的就是当前消费组的分配策略

Synchronizing Group State阶段

完成分区分配之后,就进⼊了Synchronizing Group State阶段,主要逻辑是向GroupCoordinator发送SyncGroupRequest请求,并且处理SyncGroupResponse响应,简单来说,就是leader将消费者对应的partition分配⽅案同步给consumer group 中的所有consumer

每个消费者都会向coordinator发送syncgroup请求,不过只有leader节点会发送分配⽅案,其他消费者只是打打酱油⽽已。当leader把⽅案发给coordinator以后,coordinator会把结果设置到SyncGroupResponse中。这样所有成员都知道⾃⼰应该消费哪个分区。

consumer group的分区分配⽅案是在客户端执⾏的!Kafka将这个权利下放给客户端主要是因为这样做可以有更好的灵活性

总结

我们再来总结⼀下consumer group rebalance的过程

Ø 对于每个consumer group⼦集,都会在服务端对应⼀个GroupCoordinator进⾏管理,GroupCoordinator会在zookeeper上添加watcher,当消费者加⼊或者退出consumer group时,会修改zookeeper上保存的数据,从⽽触发GroupCoordinator开始Rebalance操作

Ø 当消费者准备加⼊某个Consumer group或者GroupCoordinator发⽣故障转移时,消费者并不知道GroupCoordinator的在⽹络中的位置,这个时候就需要确定GroupCoordinator,消费者会向集群中的任意⼀个Broker节点发送ConsumerMetadataRequest请求,收到请求的broker会返回⼀个response作为响应,其中包含管理当前ConsumerGroup的GroupCoordinator,

Ø 消费者会根据broker的返回信息,连接到groupCoordinator,并且发送HeartbeatRequest,发送⼼跳的⽬的是要要奥噶苏GroupCoordinator这个消费者是正常在线的。当消费者在指定时间内没有发送⼼跳请求,则GroupCoordinator会触发Rebalance操作。

Ø 发起join group请求,两种情况

如果GroupCoordinator返回的⼼跳包数据包含异常,说明GroupCoordinator因为前⾯说的⼏种情况导致了Rebalance操作,那这个时候,consumer会发起join group请求新加⼊到consumer group的consumer确定好了GroupCoordinator以后消费者会向GroupCoordinator发起join group请求,GroupCoordinator会收集全部消费者信息之后,来确认可⽤的消费者,并从中选取⼀个消费者成为group_leader。并把相应的信息(分区分配策略、leader_id、…)封装成response返回给所有消费者,但是只有group leader会收到当前consumergroup中的所有消费者信息。当消费者确定⾃⼰是group leader以后,会根据消费者的信息以及选定分区分配策略进⾏分区分配接着进⼊Synchronizing Group State阶段,每个消费者会发送SyncGroupRequest请求到GroupCoordinator,但是只有Group Leader的请求会存在分区分配结果,GroupCoordinator会根据Group Leader的分区分配结果形成SyncGroupResponse返回给所有的Consumer。consumer根据分配结果,执⾏相应的操作

到这⾥为⽌,我们已经知道了消息的发送分区策略,以及消费者的分区消费策略和rebalance。对于应⽤层⾯来说,还有⼀个最重要的东⻄没有讲解,就是offset,他类似⼀个游标,表示当前消费的消息的位置。

如何保存消费端的消费位置

什么是offset

前⾯在讲解partition的时候,提到过offset, 每个topic可以划分多个分区(每个Topic⾄少有⼀个分区),同⼀topic下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配⼀个offset(称之为偏移量),它是消息在此分区中的唯⼀编号,kafka通过offset保证消息在分区内的顺序,offset的顺序不跨分区,即kafka只保证在同⼀个分区内的消息是有序的; 对于应⽤层的消费来说,每次消费⼀个消息并且提交以后,会保存当前消费到的最近的⼀个offset。那么offset保存在哪⾥?

offset在哪⾥维护?

在kafka中,提供了⼀个consumer_offsets_* 的⼀个topic,把offset信息写⼊到这个topic中。

consumer_offsets——按保存了每个consumer group某⼀时刻提交的offset信息。

__consumer_offsets 默认有50个分区。

根据前⾯我们演示的案例,我们设置了⼀个KafkaConsumerDemo的groupid。⾸先我们需要找到这个consumer_group保存在哪个分区中

properties.put(ConsumerConfig.GROUP_ID_CONFIG,”KafkaConsumerDemo”);

计算公式:

Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ; 由于默认情况下

groupMetadataTopicPartitionCount有50个分区,计算得到的结果为:35, 意味着当前的

consumer_group的位移信息保存在__consumer_offsets的第35个分区

执⾏如下命令,可以查看当前consumer_goup中的offset位移提交的信息

 kafka-console-consumer.sh --topic __consumer_offsets --partition 15 --bootstrap-server 192.1
--formatter
'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'  

从输出结果中,我们就可以看到test这个topic的offset的位移⽇志

分区的副本机制

我们已经知道Kafka的每个topic都可以分为多个Partition,并且 同⼀topic的多个partition会均匀分布在集群的各个节点下 。虽然这种⽅式能够有效的对数据进⾏分⽚,但是对于每个partition来说,都是单点的,当其中⼀个partition不可⽤的时候,那么这部分消息就没办法消费。所以kafka为了提⾼partition的可靠性⽽提供了副本的概念(Replica),通过副本机制来实现冗余备份。

每个分区可以有多个副本,并且在副本集合中会存在⼀个leader的副本,所有的读写请求都是由leader副本来进⾏处理。剩余的其他副本都作为follower副本,follower副本会从leader副本同步消息⽇志, 和redis cluster中的节点概念相同,leader副本为redis cluster中的主节点,follower副本为redis cluster中的备节点 。

⼀般情况下,同⼀个分区的多个副本会被均匀分配到集群中的不同broker上,当leader副本所在的broker出现故障后,可以重新选举新的leader副本继续对外提供服务。通过这样的副本机制来提⾼kafka集群的可⽤性。

创建⼀个带副本机制的topic

通过下⾯的命令去创建带2个副本的topic

 sh kafka-topics.sh --create --zookeeper 192.168.11.156:2181 --replication-factor 3 --partiti  

然后我们可以在/tmp/kafka-log路径下看到对应topic的副本信息了。我们通过⼀个图形的⽅式来表达。

针对secondTopic这个topic的3个分区对应的3个副本

通常follower副本和leader副本不会在同⼀个broker上,这种是为了保证当leader副本所在broker宕机后,follower副本可继续提供服务。

如何知道哪个各个分区中对应的leader是谁呢?

在zookeeper服务器上,通过如下命令去获取对应分区的信息, ⽐如下⾯这个是获取secondTopic第1个分区的状态信息。

 get /brokers/topics/secondTopic/partitions/1/state  

{“controller_epoch”:12,”leader”:0,”version”:1,”leader_epoch”:0,”isr”:[0,1]} 或通过这个命令

 sh kafka-topics.sh --zookeeper 192.168.13.106:2181 --describe --topic test_partition  

leader表示当前分区的leader是那个broker-id。下图中。绿⾊线条的表示该分区中的leader节点。其他节点就为follower

需要注意的是,kafka集群中的⼀个broker中最多只能有⼀个副本,leader副本所在的broker节点的分区叫leader节点,follower副本所在的broker节点的分区叫follower节点

副本的leader选举机制

Kafka提供了数据复制算法保证,如果leader副本所在的broker节点宕机或者出现故障,或者分区的leader节点发⽣故障,这个时候怎么处理呢?

那么,kafka必须要保证从follower副本中选择⼀个新的leader副本。那么kafka是如何实现选举的呢?

要了解leader选举,我们需要了解⼏个概念

Kafka分区下有可能有很多个副本(replica)⽤于实现冗余,从⽽进⼀步实现⾼可⽤。副本根据⻆⾊的不同,可分为3类:

  • leader副本:响应clients端读写请求的副本
  • follower副本:被动地备份leader副本中的数据,不能响应clients端读写请求。
  • ISR副本:Zookeeper中为每⼀个partition动态的维护了⼀个ISR,这个ISR⾥的所有replica都跟上了leader,只有ISR⾥的成员才能有被选为leader的可能,ISR副本包含了leader副本和所有与leader副本 保持同步 的follower副本,注意是和 保持同步 ,不包含和leader副本 没保持同步 的follower副本

副本协同机制

刚刚提到了,消息的读写操作都只会由leader节点来接收和处理。follower副本只负责同步数据以及

当leader副本所在的broker挂了以后,会从ISR副本中的follower副本中选取新的leader。

写请求⾸先由Leader副本处理,之后follower副本会从leader上拉取写⼊的消息,这个过程会有⼀定的延迟,导致follower副本中保存的消息略少于leader副本,但是只要没有超出阈值都可以容忍。但是如果⼀个follower副本出现异常,⽐如宕机、⽹络断开等原因⻓时间没有同步到消息,那这个时候,leader就会把它踢出去。kafka通过ISR集合来维护⼀个分区副本信息。

⼀个新leader被选举并被接受客户端的消息成功写⼊。Kafka确保从同步副本列表中选举⼀个副本为leader;leader负责维护和跟踪ISR(in-Sync replicas , 副本同步队列)中所有follower滞后的状态。

当producer发送⼀条消息到broker后,leader写⼊消息并复制到所有follower。消息提交之后才被成功复制到所有的同步副本。

ISR

ISR表示⽬前 可⽤且消息量与leader相差不多的副本集合,这是整个副本集合的⼀个⼦集 。怎么去理解可⽤和相差不多这两个词呢?具体来说,ISR集合中的副本必须满⾜两个条件:

  • 副本所在节点必须维持着与zookeeper的连接
  • 副本最后⼀条消息的offset与leader副本的最后⼀条消息的offset之间的差值不能超过指定的阈值。(replica.lag.time.max.ms) replica.lag.time.max.ms:如果该follower在此时间间隔内⼀直没有追上过leader的所有消息,则该follower就会被剔除isr列表,ISR数据保存在Zookeeper的 /brokers/topics/<topic>/partitions/<partitionId>/state 节点中。

follower副本把leader副本前的⽇志全部同步完成时,则认为follower副本已经追赶上了leader副本,这个时候会更新这个副本的lastCaughtUpTimeMs标识,kafka副本管理器会启动⼀个副本过期检查的定时任务,这个任务会定期检查当前时间与副本的lastCaughtUpTimeMs的差值是否⼤于参数replica.lag.time.max.ms 的值,如果⼤于,则会把这个副本踢出ISR集合。

如何处理所有的Replica不⼯作的情况,也可以理解为leader的选举

在ISR中⾄少有⼀个follower时,Kafka可以确保已经commit的数据不丢失,但如果某个Partition的所有Replica都宕机了,就⽆法保证数据不丢失了。这种情况下有两种可⾏的⽅案:

  1. N. 等待ISR中的任⼀个Replica“活”过来,并且选它作为Leader
  2. O. 选择第⼀个“活”过来的Replica(不⼀定是ISR中的)作为Leader,默认配置。

这就需要在可⽤性和⼀致性当中作出⼀个简单的折中。

如果⼀定要等待ISR中的Replica“活”过来,那不可⽤的时间就可能会相对较⻓。⽽且如果ISR中的所有Replica都⽆法“活”过来了,或者数据都丢失了,这个Partition将永远不可⽤。

选择第⼀个“活”过来的Replica作为Leader,⽽这个Replica不是ISR中的Replica,那即使它并不保证已经包含了所有已commit的消息,它也会成为Leader⽽作为consumer的数据源(所有读写都由Leader完成)。

默认情况下Kafka采⽤第⼆种策略,即 unclean.leader.election.enable=true ,也可以将此参数设置为 false 来启⽤第⼀种策略。

副本数据同步原理

了解了副本的协同过程以后,还有⼀个最重要的机制,就是数据的同步过程。

下图中,深红⾊部分表示test_replica分区的leader副本,另外两个节点上浅⾊部分表示follower副本

Producer在发布消息到某个Partition时

  1. 先通过ZooKeeper找到该Partition的Leader get /brokers/topics/<topic>/partitions/2/state ,然后⽆论该Topic的Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。
  2. Leader会将该消息写⼊其本地Log。每个Follower都从Leader pull数据。这种⽅式上,Follower存储的数据顺序与Leader保持⼀致。
  3. Follower在收到该消息并写⼊其Log后,向Leader发送ACK。
  4. ⼀旦Leader收到了ISR中的所有Replica的ACK,该消息就被认为已经commit了,Leader将增加HW(HighWatermark)并且向Producer发送ACK。

LEO :即⽇志末端位移(log end offset),记录了该副本底层⽇志(log)中下⼀条消息的位移值。注意是下⼀条消息!也就是说,如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。另外,leader LEO和follower LEO的更新是有区别的,可以看出leader副本和follower副本都有LEO。

HW :即所有follower副本中相对于leader副本最⼩的LEO值。HW是相对leader副本⽽⾔的,其HW值不会⼤于LEO值。⼩于等于HW值的所有消息都被认为是“已备份”的(replicated)。同理,leader副本和follower副本的HW更新是有区别的

通过下⾯这幅图来表达LEO、HW的含义,随着follower副本不断和leader副本进⾏数据同步,follower副本的LEO主键会后移并且追赶到leader副本,这个追赶上的判断标准是当前副本的LEO是否⼤于或者等于leader副本的HW,如果follower在 replica.lag.time.max.ms 时间范围内追赶上了leader副本,该follower副本则加⼊到ISR副本内,也可以使得之前被踢出的follower副本重新加⼊到ISR集合中;如果在 replica.lag.time.max.ms 时间范围内follower副本没追赶上leader副本,该follower副本会被从ISR副本范围内踢出,可以看出 ISR副本是⼀个由zookerper动态监控的变化的副本 。另外, 假如说下图中的最右侧的follower副本被踢出ISR集合,也会导致这个分区的HW发⽣变化,变成了3

数据可靠性和持久性保证

producer数据不丢失

当producer向leader发送数据时,可以通过 request.required.acks 参数来设置数据可靠性的级别:

1、request.required.acks=0,producer写⼊的⼀条消息会⽴即返回ack确认消息,不管leader副本是否同步完或者ISR中的follower副本是否同步完,此配置丢失数据⻛险很⼤,⽣产环境很少使⽤。

2、request.required.acks=1(默认配置)

producer写⼊的⼀条消息后会等到leader副本同步完成(不需要等到ISR内的follower副本同步完成)后⽴即返回给客户端ack消息。该配置的⻛险是如果ISR内的follower副本还没有完成信息同步时,leader节点宕机了,然后通过选举⼀个follower副本做为新的节点,此时就会有数据丢失的问题,相当于mysql的主从同步,优点就是可⽤性强,缺点就是弱⼀致性,可能造成数据丢失。

3、request.required.acks=-1

producer写⼊的⼀ 条消息需要等到分区的leader 副本完成同步,且需要等待 ISR集合中的所有follower副本都同步完 之后才能返回producer确认的ack,这样就避免了部分数据被写进了leader,还没来得及被任何follower复制就宕机了,⽽造成数据丢失,类似于强⼀致性,追求强⼀致性也就意味着可⽤性(响应时间)会降低。设置成-1就可以保证写⼊的数据不丢失了吗?不⼀定,`⽐如当ISR中只有leader副本时(前⾯ISR那⼀节讲到,ISR副本中的成员由于某些情况会增加也会减少,最少就只剩⼀个leader),当leader副本宕机后,所有数据丢失。

为了避免数据的丢失,提⾼可靠性,避免ISR副本中只有⼀个leader副本情况的发⽣,可以使⽤参数min.insync.replicas 来约束,该参数的意思是 设定ISR中的最⼩副本数是多少,总数包含leader副本和follower副本之和 ,如果ISR中的副本数不够参数 min.insync.replicas 所设定的值,客户端会返回异常。

如果由于⽹络原因导致producer push数据失败了,我们可以设置 retries 参数来进⾏重试,总结:

producer消息不丢失需要下⾯3中措施

  1. request.required.acks=-1
  2. 设置min.insync.replicas参数
  3. 设置retries参数

broker数据不丢失

上⾯已经介绍过 unclean.leader.election.enable=false 参数。

这⾥设置 unclean.leader.election.enable=false ,表示:如果ISR副本全部宕机后, 等到ISR副本中的⾥⼀个副本启动之后,并将他作为leader副本.

consumer数据不丢失

enable.auto.commit 该参数默认为true,表明consumer在下次poll消息时⾃动提交上次poll出的所有消息的消费位移,如果设置为false,则需要⽤户⼿动提交⼿动提交所有消息的消费位移。

消息重复消费和消息丢失的场景

当 enable.auto.commit设置为true的时候会有消息重复消费和消息丢失的场景。

当应⽤端消费消息时,还没有提交消费位移的时候,此时kafka出现宕机,那么在kafka恢复之后,这些消息将会重新被消费⼀遍,这就造成了重复消费。

⽐如consumer第⼀次poll出n条消息进⾏消费,达到auto.commit.interval.ms时间后,cosumer会进⾏下⼀次poll并提交上次poll出的n条消息的消费位移。如果第⼀次poll出的n条消息客户端还没有消费完,此时客户端宕机了,当客户端重启后,将会从第⼆次poll的位置开始拉取消息,从⽽丢失第⼀次未提交消费位移的消息,这就造成了数据丢失。

只能避免数据丢失⽽不能解决数据重复

当设置enable.auto.commit为false时,所有的消息位移提交都为⼿动提交了,所有可以避免上⾯提到的数据丢失问题,可以保证consumer消息时数据不会丢失。

⼿动提交有同步提交和异步提交,我们可以选择在应⽤端处理完消息后⼿动提交消费位移。如果在消费完消息准备提交消息位移的时候,应⽤端发⽣了宕机,那么重启之后这些消息还是会被重新消费⼀遍,所以通过配置 enable.auto.commit参数为false只能避免消费端丢失消息⽽不能避免消费端重复消费消息.

Kafka消费者push消息的模式

Kafka的发送模式由producer端的配置参数 producer.type 来设置,这个参数指定了在后台线程中消息的发送⽅式是同步的还是异步的,默认是同步的⽅式,即 producer.type=sync 。如果设置成异步的模式,即 producer.type=async ,可以是producer以 batch 的形式push数据,就是将消息按批量的⽅式发送,⽽不是⼀条⼀条的发送,这样会极⼤的提⾼broker的性能,但是这样会增加丢失数据的⻛险。 如果需要确保消息的可靠性,必须要将producer.type设置为sync。

⾼可靠性配置

要保证数据写⼊到Kafka是安全的,⾼可靠的,需要如下的配置:

分区副本, 你可以创建分区副本来提升数据的可靠性,避免数据丢失,但是分区数过多也会带来性能上的开销,⼀般来说,3个副本就能满⾜对⼤部分场景的可靠性要求

topic的配置:replication.factor>=3,指 副本数 ⾄少是3个; 2<=min.insync.replicas<=replication.factor,指 ISR中的副本数 ⼤于等于2,且⼩于等于3

broker的配置:leader的选举条件unclean.leader.election.enable=false

producer的配置:request.required.acks=-1(all),producer.type=sync

消息的存储,消息的持久化

消息发送端发送消息到broker上以后,消息是如何持久化的呢?那么接下来去分析下消息的存储⾸先我们需要了解的是,kafka是使⽤⽇志⽂件的⽅式来保存⽣产者和发送者的消息,每条消息都有⼀个offset值来表示它在分区中的偏移量。Kafka中存储的⼀般都是海量的消息数据,为了避免⽇志⽂件过⼤,Log并不是直接对应在⼀个磁盘上的⽇志⽂件,⽽是对应磁盘上的⼀个⽬录,这个⽬录的命名规则是<topic_name>_<partition_id>

消息的⽂件存储机制

⼀个topic的多个partition在物理磁盘上的保存路径,路径保存在 /tmp/kafka-logs/topic_partition,包含⽇志⽂件、索引⽂件和时间索引⽂件

kafka是通过分段的⽅式将Log分为多个LogSegment,LogSegment是⼀个逻辑上的概念,⼀个LogSegment对应磁盘上的⼀个⽇志⽂件和⼀个索引⽂件,其中⽇志⽂件是⽤来记录消息的。索引⽂件是⽤来保存消息的索引。那么这个LogSegment是什么呢?

LogSegment

假设kafka以partition为最⼩存储单位,那么我们可以想象当kafka producer不断发送消息,必然会引起partition⽂件的⽆线扩张,这样对于消息⽂件的维护以及被消费的消息的清理带来⾮常⼤的挑战,所以kafka 以segment为单位⼜把partition进⾏细分。每个partition相当于⼀个巨型⽂件被平均分配到多个⼤⼩相等的segment数据⽂件中(每个segment⽂件中的消息不⼀定相等),这种特性⽅便已经被消费的消息的清理,提⾼磁盘的利⽤率。

  • log.segment.bytes=107370 (设置分段⼤⼩),默认是1gb,我们把这个值调⼩以后,可以看到⽇志分段的效果
  • 抽取其中3个分段来进⾏分析

segment file由2⼤部分组成,分别为index file和data file,此2个⽂件⼀⼀对应,成对出现,后缀”.index”和“.log”分别表示为segment索引⽂件、数据⽂件.

segment⽂件命名规则:partion全局的第⼀个segment从0开始,后续每个segment⽂件名为上⼀个segment⽂件最后⼀条消息的offset值进⾏递增。数值最⼤为64位long⼤⼩,20位数字字符⻓度,没有数字⽤0填充

查看segment⽂件命名规则

通过下⾯这条命令可以看到kafka消息⽇志的内容

 sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000  

假如第⼀个log⽂件的最后⼀个offset为:5376,所以下⼀个segment的⽂件命名为:

00000000000000005376.log。对应的index为00000000000000005376.index

segment中index和log的对应关系

从所有分段中,找⼀个分段进⾏分析为了提⾼查找消息的性能,为每⼀个⽇志⽂件添加2个索引索引⽂件:OffsetIndex 和 TimeIndex,分别对应.index以及.timeindex, TimeIndex索引⽂件格式:它是映射时间戳和相对offset,查看索引内容:

 sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000  

如图所示,index中存储了索引以及物理偏移量。 log存储了消息的内容。索引⽂件的元数据执⾏对应数据⽂件中message的物理偏移地址。举个简单的案例来说,以[4053,80899]为例,在log⽂件中,对应的是第4053条记录,物理偏移量(position)为80899. position是ByteBuffer的指针位置

在partition中如何通过offset查找message

查找的算法是

1. 根据offset的值,查找segment段中的index索引⽂件。由于索引⽂件命名是以上⼀个⽂件的最后⼀个offset进⾏命名的,所以,使⽤⼆分查找算法能够根据offset快速定位到指定的索引⽂件。

2. 找到索引⽂件后,根据offset进⾏定位,找到索引⽂件中的符合范围的索引。(kafka采⽤稀疏索引的⽅式来提⾼查找性能)

3. 得到position以后,再到对应的log⽂件中,从position出开始查找offset对应的消息,将每条消息的offset与⽬标offset进⾏⽐较,直到找到消息

⽐如说,我们要查找offset=2490这条消息,那么先找到00000000000000000000.index, 然后找到[2487,49111]这个索引,再到log⽂件中,根据49111这个position开始查找,⽐较每条消息的offset是否⼤于等于2490。最后查找到对应的消息以后返回

Log⽂件的消息内容分析

前⾯我们通过kafka提供的命令,可以查看⼆进制的⽇志⽂件信息,⼀条消息,会包含很多的字段。

offset和position这两个前⾯已经讲过了、 createTime表示创建时间、keysize和valuesize表示key和value的⼤⼩、 compresscodec表示压缩编码、payload:表示消息的具体内容

⽇志的清除策略以及压缩策略

⽇志清除策略

前⾯提到过,⽇志的分段存储,⼀⽅⾯能够减少单个⽂件内容的⼤⼩,另⼀⽅⾯,⽅便kafka进⾏⽇志清理。⽇志的清理策略有两个:

1. 根据消息的保留时间,当消息在kafka中保存的时间超过了指定的时间,就会触发清理过程

2. 根据topic存储的数据⼤⼩,当topic所占的⽇志⽂件⼤⼩⼤于⼀定的阀值,则可以开始删除最旧的消息。kafka会启动⼀个后台线程,定期检查是否存在可以删除的消息

通过log.retention.bytes和log.retention.hours这两个参数来设置,当其中任意⼀个达到要求,都会执⾏删除。默认的保留时间是:7天

⽇志压缩策略

Kafka还提供了“⽇志压缩(Log Compaction)”功能,通过这个功能可以有效的减少⽇志⽂件的⼤⼩,缓解磁盘紧张的情况,在很多实际场景中,消息的key和value的值之间的对应关系是不断变化的,就像数据库中的数据会不断被修改⼀样,消费者只关⼼key对应的最新的value。因此,我们可以开启kafka的⽇志压缩功能,服务端会在后台启动启动Cleaner线程池,定期将相同的key进⾏合并,只保留最新的value值。⽇志的压缩原理是

磁盘存储的性能问题

磁盘存储的性能优化

我们现在⼤部分企业仍然⽤的是机械结构的磁盘,如果把消息以随机的⽅式写⼊到磁盘,那么磁盘⾸先要做的就是寻址,也就是定位到数据所在的物理地址,在磁盘上就要找到对应的柱⾯、磁头以及对应的扇区;这个过程相对内存来说会消耗⼤量时间,为了规避随机读写带来的时间消耗,kafka采⽤顺序写的⽅式存储数据。即使是这样,但是频繁的I/O操作仍然会造成磁盘的性能瓶颈

零拷⻉

消息从发送到落地保存,broker维护的消息⽇志本身就是⽂件⽬录,每个⽂件都是⼆进制保存,⽣产者和消费者使⽤相同的格式来处理。在消费者获取消息时,服务器先从硬盘读取数据到内存,然后把内存中的数据原封不动的通过socket发送给消费者。虽然这个操作描述起来很简单,但实际上经历了很多步骤。

操作系统将数据从磁盘读⼊到内核空间的⻚缓存:

▪ 应⽤程序将数据从内核空间读⼊到⽤户空间缓存中

▪ 应⽤程序将数据写回到内核空间到socket缓存中

▪ 操作系统将数据从socket缓冲区复制到⽹卡缓冲区,以便将数据经⽹络发出

通过“零拷⻉”技术,可以去掉这些没必要的数据复制操作,同时也会减少上下⽂切换次数。现代的unix操作系统提供⼀个优化的代码路径,⽤于将数据从⻚缓存传输到socket;在Linux中,是通过sendfile系统调⽤来完成的。Java提供了访问这个系统调⽤的⽅法:FileChannel.transferTo API使⽤sendfile,只需要⼀次拷⻉就⾏,允许操作系统将数据直接从⻚缓存发送到⽹络上。所以在这个优化的路径中,只有最后⼀步将数据拷⻉到⽹卡缓存中是需要的

⻚缓存

⻚缓存是操作系统实现的⼀种主要的磁盘缓存,但凡设计到缓存的,基本都是为了提升i/o性能,所以⻚缓存是⽤来减少磁盘I/O操作的。

磁盘⾼速缓存有两个重要因素:

第⼀,访问磁盘的速度要远低于访问内存的速度,若从处理器L1和L2⾼速缓存访问则速度更快。

第⼆,数据⼀旦被访问,就很有可能短时间内再次访问。正是由于基于访问内存⽐磁盘快的多,所以磁盘的内存缓存将给系统存储性能带来质的⻜越。

当 ⼀ 个进程准备读取磁盘上的⽂件内容时, 操作系统会先查看待读取的数据所在的⻚(page)是否在⻚缓存(pagecache)中,如果存在(命中)则直接返回数据, 从⽽避免了对物理磁盘的I/0操作;如果没有命中, 则操作系统会向磁盘发起读取请求并将读取的数据⻚存⼊⻚缓存, 之后再将数据返回给进程。

同样,如果 ⼀ 个进程需要将数据写⼊磁盘, 那么操作系统也会检测数据对应的⻚是否在⻚缓存中,如果不存在, 则会先在⻚缓存中添加相应的⻚, 最后将数据写⼊对应的⻚。 被修改过后的⻚也就变成了脏⻚, 操作系统会在合适的时间把脏⻚中的数据写⼊磁盘, 以保持数据的 ⼀ 致性Kafka中⼤量使⽤了⻚缓存, 这是Kafka实现⾼吞吐的重要因素之 ⼀ 。 虽然消息都是先被写⼊⻚缓存,然后由操作系统负责具体的刷盘任务的, 但在Kafka中同样提供了同步刷盘及间断性强制刷盘(fsync),可以通过 log.flush.interval.messages 和 log.flush.interval.ms 参数来控制。

同步刷盘能够保证消息的可靠性,避免因为宕机导致⻚缓存数据还未完成同步时造成的数据丢失。但是实际使⽤上,我们没必要去考虑这样的因素以及这种问题带来的损失,消息可靠性可以由多副本来解决,同步刷盘会带来性能的影响。 刷盘的操作由操作系统去完成即可

文章来源:智云一二三科技

文章标题:kafka全面认知

文章地址:https://www.zhihuclub.com/97868.shtml

关于作者: 智云科技

热门文章

网站地图