并做到分区、副本数可配置
一、操作步骤
1.1 创建 Topic 信息配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Data @Configuration @ConfigurationProperties(prefix = "manage.server.kafka") public class TopicConfig {
@Data static class Topic {
String name;
Integer numPartitions = 1;
Short replicationFactor = 1;
NewTopic convertToNewTopic() { return new NewTopic(this.name, this.numPartitions, this.replicationFactor); } }
@ApiModelProperty("topic列表") private List<Topic> topics;
}
|
1.2 创建 Topic Bean 管理类
此类的作用就是把所有配置好的 Topic 信息注册成为 NewTopic Bean 到容器中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Component @RequiredArgsConstructor @Slf4j public class TopicAdministrator {
private final TopicConfig topicConfig;
private final GenericWebApplicationContext applicationContext;
@PostConstruct public void init() { topicConfig.getTopics().forEach(topic -> { applicationContext.registerBean(topic.name, NewTopic.class, topic::convertToNewTopic); log.info("正在注册bean: {}", topic); }); }
}
|
1.3 配置 Topic 信息
这样可以在配置文件中统一配置每个 Topic 的名称、分区数、副本数,项目启动后会自动创建这些 Topic
1 2 3 4 5 6 7 8 9 10
| manage: server: kafka: topics: - name: test num-partitions: 1 replication-factor: 1 - name: test01 num-partitions: 1 replication-factor: 1
|
二、原理
2.1 为什么注册 NewTopic Bean 就会自动创建 Topic ?
Spring 帮我们封装了一个 KafkaAdmin 工具:org.springframework.kafka.core.KafkaAdmin
这个类的 bean 实例化后会执行一个初始化方法:
可以看到它从容器中获取 NewTopic.class 的 bean,所以这里它能获取到我们配置好,注册到容器的 topic 配置信息
1
| Collection<NewTopic> newTopics = this.applicationContext.getBeansOfType(NewTopic.class, false, false).values();
|
然后又把获取好的 topic 信息去进行创建操作 addTopicsIfNeeded
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| private void addTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics) { if (topics.size() > 0) { Map<String, NewTopic> topicNameToTopic = new HashMap<>(); topics.forEach(t -> topicNameToTopic.compute(t.name(), (k, v) -> t)); DescribeTopicsResult topicInfo = adminClient .describeTopics(topics.stream() .map(NewTopic::name) .collect(Collectors.toList())); List<NewTopic> topicsToAdd = new ArrayList<>(); Map<String, NewPartitions> topicsToModify = checkPartitions(topicNameToTopic, topicInfo, topicsToAdd); if (topicsToAdd.size() > 0) { addTopics(adminClient, topicsToAdd); } if (topicsToModify.size() > 0) { modifyTopics(adminClient, topicsToModify); } } }
|
所以自动创建 Topic 的事情,Spring 封装的 Kafka 工具里面已经帮我们做了,我们只需要把 NewTopic bean 注册到容器中即可。