深入理解 Kafka 主题分区机制

news/2025/2/22 6:26:54

分布式消息系统中,Apache Kafka 的主题分区机制是其核心特性之一。它不仅提供了高吞吐量和可扩展性,还通过分区实现了消息的有序存储和高效消费。本文将通过详细的代码示例和分析,帮助读者深入理解 Kafka 的主题分区机制。
一、Kafka 分区的基本概念
在 Kafka 中,每个主题(Topic)被划分为多个分区(Partition)。分区是 Kafka 存储消息的基本单位,每个分区是一个有序的、不可变的消息序列。消息在分区中被分配一个唯一的偏移量(Offset),用于标识消息在分区中的位置。生产者(Producer)在发送消息时可以指定分区,也可以让 Kafka 自动分配分区。消费者(Consumer)按照偏移量顺序读取消息,从而保证消息的顺序性。
二、创建主题和分区
在 Kafka 中,可以通过 Admin API 创建主题并指定分区数量。以下是一个简单的 Java 示例代码,展示如何使用 Kafka 的 AdminClient 创建主题:
java复制
package com.logicbig.example;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;

import java.util.Collections;
import java.util.Properties;

public class TopicCreator {
public static void main(String[] args) throws Exception {
createTopic(“example-topic-1”, 1);
createTopic(“example-topic-2”, 2);
}

private static void createTopic(String topicName, int numPartitions) throws Exception {
    Properties config = new Properties();
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    AdminClient admin = AdminClient.create(config);

    // 检查主题是否已存在
    boolean alreadyExists = admin.listTopics().names().get().stream()
            .anyMatch(existingTopicName -> existingTopicName.equals(topicName));

    if (alreadyExists) {
        System.out.printf("主题已存在: %s%n", topicName);
    } else {
        // 创建新主题
        System.out.printf("创建主题: %s%n", topicName);
        NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
        admin.createTopics(Collections.singleton(newTopic)).all().get();
    }

    // 描述主题
    System.out.println("-- 描述主题 --");
    admin.describeTopics(Collections.singleton(topicName)).all().get()
            .forEach((topic, desc) -> {
                System.out.println("主题: " + topic);
                System.out.printf("分区数量: %s, 分区ID: %s%n", desc.partitions().size(),
                        desc.partitions().stream().map(p -> Integer.toString(p.partition())).collect(Collectors.joining(",")));
            });
}

}
运行上述代码后,会创建两个主题:example-topic-1 和 example-topic-2,分别包含 1 个和 2 个分区。
三、消息发送与分区
(一)指定分区发送消息
生产者在发送消息时可以显式指定分区。以下代码展示了如何向单分区主题发送消息:
java复制
package com.logicbig.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class PartitionExample1 {
private static int PARTITION_COUNT = 1;
private static String TOPIC_NAME = “example-topic-1”;
private static int MSG_COUNT = 4;

public static void main(String[] args) throws Exception {
    KafkaProducer<String, String> producer = ExampleHelper.createProducer();
    for (int i = 0; i < MSG_COUNT; i++) {
        String value = "message-" + i;
        String key = Integer.toString(i);
        producer.send(new ProducerRecord<>(TOPIC_NAME, 0, key, value));
    }
}

}
运行结果如下:
复制
发送消息主题: example-topic-1, key: 0, value: message-0, 分区: 0
发送消息主题: example-topic-1, key: 1, value: message-1, 分区: 0
发送消息主题: example-topic-1, key: 2, value: message-2, 分区: 0
发送消息主题: example-topic-1, key: 3, value: message-3, 分区: 0
(二)多分区主题的消息发送
对于多分区主题,生产者可以将消息发送到不同的分区:
java复制
package com.logicbig.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class PartitionExample2 {
private static int PARTITION_COUNT = 2;
private static String TOPIC_NAME = “example-topic-2”;
private static int MSG_COUNT = 4;

public static void main(String[] args) throws Exception {
    KafkaProducer<String, String> producer = ExampleHelper.createProducer();
    for (int i = 0; i < MSG_COUNT; i++) {
        for (int partitionId = 0; partitionId < PARTITION_COUNT; partitionId++) {
            String value = "message-" + i;
            String key = Integer.toString(i);
            producer.send(new ProducerRecord<>(TOPIC_NAME, partitionId, key, value));
        }
    }
}

}
运行结果如下:
复制
发送消息主题: example-topic-2, key: 0, value: message-0, 分区: 0
发送消息主题: example-topic-2, key: 0, value: message-0, 分区: 1
发送消息主题: example-topic-2, key: 1, value: message-1, 分区: 0
发送消息主题: example-topic-2, key: 1, value: message-1, 分区: 1
发送消息主题: example-topic-2, key: 2, value: message-2, 分区: 0
发送消息主题: example-topic-2, key: 2, value: message-2, 分区: 1
发送消息主题: example-topic-2, key: 3, value: message-3, 分区: 0
发送消息主题: example-topic-2, key: 3, value: message-3, 分区: 1
(三)不指定分区发送消息
如果生产者不显式指定分区,Kafka 会根据默认的分区策略(通常基于消息的键)选择分区:
java复制
package com.logicbig.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class PartitionExample3 {
private static String TOPIC_NAME = “example-topic-2”;
private static int MSG_COUNT = 4;

public static void main(String[] args) throws Exception {
    KafkaProducer<String, String> producer = ExampleHelper.createProducer();
    for (int i = 0; i < MSG_COUNT; i++) {
        String value = "message-" + i;
        String key = Integer.toString(i);
        producer.send(new ProducerRecord<>(TOPIC_NAME, key, value));
    }
}

}
运行结果如下:
复制
发送消息主题: example-topic-2, key: 0, value: message-0, 分区: 未指定
分区分配: 0
发送消息主题: example-topic-2, key: 1, value: message-1, 分区: 未指定
分区分配: 1
发送消息主题: example-topic-2, key: 2, value: message-2, 分区: 未指定
分区分配: 0
发送消息主题: example-topic-2, key: 3, value: message-3, 分区: 未指定
分区分配: 1
四、消息消费与分区
消费者按照分区顺序读取消息。以下代码展示了如何消费单分区和多分区主题的消息:
java复制
package com.logicbig.example;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerExample {
public static void main(String[] args) throws Exception {
KafkaConsumer<String, String> consumer = ExampleHelper.createConsumer(“example-topic-2”);
int numMsgReceived = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> record : records) {
numMsgReceived++;
System.out.printf(“消费消息: key = %s, value = %s, 分区ID = %s, 偏移量 = %s%n”,
record.key(), record.value(), record.partition(), record.offset());
}
consumer.commitSync();
if (numMsgReceived >= 8) {
break;
}
}
}
}
运行结果如下:
复制
消费消息: key = 0, value = message-0, 分区ID = 0, 偏移量 = 0
消费消息: key = 1, value


http://www.niftyadmin.cn/n/5861790.html

相关文章

深入解析 Uniapp 的页面结构

一、引言 Uniapp 是一个使用 Vue.js 开发跨平台应用的前端框架&#xff0c;它能让开发者通过编写一套代码&#xff0c;发布到 iOS、Android、H5、小程序等多个平台。在 Uniapp 开发中&#xff0c;清晰理解页面结构是高效开发的基础&#xff0c;本文将深入剖析 Uniapp 的页面结…

OpenSSL crt key (生成一套用于TLS双向认证的证书密钥)

OpenSSL—— TLS证书 问&#xff1a;如何生成一套TLS证书、密钥呢&#xff1f; 生成一套 TLS 证书&#xff0c;包括 根 CA 证书、服务器证书、客户端证书&#xff0c;可以使用 openssl 命令来完成。完整的步骤如下&#xff0c;包括根 CA、服务器证书和客户端证书的生成。 &am…

Apache Flink架构深度解析:任务调度、算子数据同步与TaskSlot资源管理机制

Apache Flink是一个分布式流处理框架&#xff0c;其核心架构设计围绕有界与无界数据流的统一处理能力展开。以下从任务分配、算子数据同步、TaskManager与JobManager的TaskSlot机制三个维度展开详细分析&#xff1a; 一、任务分配机制 Flink的任务分配基于并行度&#xff08;P…

HTTP 常见状态码技术解析(应用层)

引言 HTTP 状态码是服务器对客户端请求的标准化响应标识&#xff0c;属于应用层协议的核心机制。其采用三位数字编码&#xff0c;首位数字定义状态类别&#xff0c;后两位细化具体场景。 状态码不仅是服务端行为的声明&#xff0c;更是客户端处理响应的关键依据。本文将从协议规…

策略模式Spring框架下开发实例

策略类Spring框架下开发实例 先列出策略模式下需要那些类: 策略接口 (Strategy)&#xff0c;定义所有策略类必须遵循的行为。 具体策略类&#xff08;如 ConcreteStrategyA、ConcreteStrategyB&#xff09;&#xff0c;实现不同的算法或行为。 上下文类 (Context)&#xff0c;…

(一)趣学设计模式 之 单例模式!

目录 一、啥是单例模式&#xff1f;二、为什么要用单例模式&#xff1f;三、单例模式怎么实现&#xff1f;1. 饿汉式&#xff1a;先下手为强&#xff01; &#x1f608;2. 懒汉式&#xff1a;用的时候再创建&#xff01; &#x1f634;3. 枚举&#xff1a;最简单最安全的单例&a…

leetcode刷题第十三天——二叉树Ⅲ

本次刷题顺序是按照卡尔的代码随想录中给出的顺序 翻转二叉树 226. 翻转二叉树 /*** Definition for a binary tree node.* struct TreeNode {* int val;* struct TreeNode *left;* struct TreeNode *right;* };*//*总体思路就是&#xff0c;对于每一个结点&…

如何利用 Vue 的生命周期钩子进行初始化和清理操作?

一、初始化操作的核心钩子 1. created&#xff08;选项式API&#xff09; export default {data() {return { user: null };},created() {// 适合初始化数据、发起非DOM操作请求this.fetchUser();},methods: {async fetchUser() {const response await fetch(/api/user);thi…