SpringBoot自动创建Kafka Topic

并做到分区、副本数可配置

一、操作步骤

1.1 创建 Topic 信息配置类

@Data
@Configuration
@ConfigurationProperties(prefix = "manage.server.kafka")
public class TopicConfig {

    @Data
    static class Topic {
        /**
         * topic名称
         */
        String name;

        /**
         * 分区数 默认1
         */
        Integer numPartitions = 1;

        /**
         * 副本数 默认1
         */
        Short replicationFactor = 1;

        NewTopic convertToNewTopic() {
            return new NewTopic(this.name, this.numPartitions, this.replicationFactor);
        }
    }

    @ApiModelProperty("topic列表")
    private List<Topic> topics;

}

1.2 创建 Topic Bean 管理类

此类的作用就是把所有配置好的 Topic 信息注册成为 NewTopic Bean 到容器中

@Component
@RequiredArgsConstructor
@Slf4j
public class TopicAdministrator {

    private final TopicConfig topicConfig;

    private final GenericWebApplicationContext applicationContext;

    @PostConstruct
    public void init() {
        topicConfig.getTopics().forEach(topic -> {
            applicationContext.registerBean(topic.name, NewTopic.class, topic::convertToNewTopic);
            log.info("正在注册bean: {}", topic);
        });
    }

}

1.3 配置 Topic 信息

这样可以在配置文件中统一配置每个 Topic 的名称、分区数、副本数,项目启动后会自动创建这些 Topic

manage:
  server:
    kafka:
      topics:
        - name: test
          num-partitions: 1
          replication-factor: 1
        - name: test01
          num-partitions: 1
          replication-factor: 1

二、原理

2.1 为什么注册 NewTopic Bean 就会自动创建 Topic ?

Spring 帮我们封装了一个 KafkaAdmin 工具:org.springframework.kafka.core.KafkaAdmin

这个类的 bean 实例化后会执行一个初始化方法:

可以看到它从容器中获取 NewTopic.class 的 bean,所以这里它能获取到我们配置好,注册到容器的 topic 配置信息

Collection<NewTopic> newTopics = this.applicationContext.getBeansOfType(NewTopic.class, false, false).values();

然后又把获取好的 topic 信息去进行创建操作 addTopicsIfNeeded 方法:

private void addTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics) {
	if (topics.size() > 0) {
		Map<String, NewTopic> topicNameToTopic = new HashMap<>();
		topics.forEach(t -> topicNameToTopic.compute(t.name(), (k, v) -> t));
		DescribeTopicsResult topicInfo = adminClient
				.describeTopics(topics.stream()
						.map(NewTopic::name)
						.collect(Collectors.toList()));
		List<NewTopic> topicsToAdd = new ArrayList<>();
		Map<String, NewPartitions> topicsToModify = checkPartitions(topicNameToTopic, topicInfo, topicsToAdd);
		if (topicsToAdd.size() > 0) {
			addTopics(adminClient, topicsToAdd);
		}
		if (topicsToModify.size() > 0) {
			modifyTopics(adminClient, topicsToModify);
		}
	}
}

所以自动创建 Topic 的事情,Spring 封装的 Kafka 工具里面已经帮我们做了,我们只需要把 NewTopic bean 注册到容器中即可。


SpringBoot自动创建Kafka Topic
https://www.powercheng.fun/articles/47bee030/
作者
powercheng
发布于
2023年11月15日
许可协议