更新时间:2023年10月19日11时37分 来源:传智教育 浏览次数:
在Apache Kafka中,HW(High Watermark)和 LEO(Log End Offset)是与分区的复制和消息传递相关的两个关键概念。
High Watermark是一个分区的消息复制进度的指示器。它表示了已经成功复制到所有副本的消息的位置。HW之前的所有消息都被认为是已提交的消息,这意味着消费者可以安全地消费这些消息。HW通常是消费者组维护的偏移量的参考点。
Log End Offset表示一个分区中消息日志的最后一个位置,即下一条消息要写入的位置。LEO是动态变化的,因为消息不断被追加到分区。它表示了分区中的最新消息位置。
接下来笔者用一段具体的示例代码,来演示下如何使用Java和Kafka Consumer API来获取分区的HW和LEO:
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaHWLEOExample { public static void main(String[] args) { // 设置Kafka消费者的配置 Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // 创建Kafka消费者 Consumer<String, String> consumer = new KafkaConsumer<>(props); // 指定要订阅的主题 String topic = "my-topic"; consumer.subscribe(Collections.singletonList(topic)); // 获取分区信息 PartitionInfo partitionInfo = consumer.partitionsFor(topic).get(0); int partition = partitionInfo.partition(); // 在消费者循环中获取HW和LEO while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (TopicPartition topicPartition : records.partitions()) { long hw = consumer.position(topicPartition); // 获取HW long leo = consumer.endOffsets(Collections.singleton(topicPartition)).get(topicPartition); // 获取LEO System.out.println("Partition " + topicPartition.partition() + ": HW = " + hw + ", LEO = " + leo); } } } }
上面的代码创建了一个Kafka消费者,并订阅了一个主题。在消费者循环中,我们使用position()方法来获取分区的HW,并使用endOffsets()方法来获取分区的LEO。这可以帮助我们监视分区的消息复制进度和消息日志的结束位置。