并做到分区、副本数可配置
一、操作步骤
1.1 创建 Topic 信息配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
   | @Data @Configuration @ConfigurationProperties(prefix = "manage.server.kafka") public class TopicConfig {
      @Data     static class Topic {         
 
          String name;
          
 
          Integer numPartitions = 1;
          
 
          Short replicationFactor = 1;
          NewTopic convertToNewTopic() {             return new NewTopic(this.name, this.numPartitions, this.replicationFactor);         }     }
      @ApiModelProperty("topic列表")     private List<Topic> topics;
  }
   | 
1.2 创建 Topic Bean 管理类
此类的作用就是把所有配置好的 Topic 信息注册成为 NewTopic Bean 到容器中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
   | @Component @RequiredArgsConstructor @Slf4j public class TopicAdministrator {
      private final TopicConfig topicConfig;
      private final GenericWebApplicationContext applicationContext;
      @PostConstruct     public void init() {         topicConfig.getTopics().forEach(topic -> {             applicationContext.registerBean(topic.name, NewTopic.class, topic::convertToNewTopic);             log.info("正在注册bean: {}", topic);         });     }
  }
   | 
1.3 配置 Topic 信息
这样可以在配置文件中统一配置每个 Topic 的名称、分区数、副本数,项目启动后会自动创建这些 Topic
1 2 3 4 5 6 7 8 9 10
   | manage:   server:     kafka:       topics:         - name: test           num-partitions: 1           replication-factor: 1         - name: test01           num-partitions: 1           replication-factor: 1
   | 
二、原理
2.1 为什么注册 NewTopic Bean 就会自动创建 Topic ?
Spring 帮我们封装了一个 KafkaAdmin 工具:org.springframework.kafka.core.KafkaAdmin
这个类的 bean 实例化后会执行一个初始化方法:

可以看到它从容器中获取 NewTopic.class 的 bean,所以这里它能获取到我们配置好,注册到容器的 topic 配置信息
1
   | Collection<NewTopic> newTopics = this.applicationContext.getBeansOfType(NewTopic.class, false, false).values();
   | 
然后又把获取好的 topic 信息去进行创建操作 addTopicsIfNeeded 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
   | private void addTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics) { 	if (topics.size() > 0) { 		Map<String, NewTopic> topicNameToTopic = new HashMap<>(); 		topics.forEach(t -> topicNameToTopic.compute(t.name(), (k, v) -> t)); 		DescribeTopicsResult topicInfo = adminClient 				.describeTopics(topics.stream() 						.map(NewTopic::name) 						.collect(Collectors.toList())); 		List<NewTopic> topicsToAdd = new ArrayList<>(); 		Map<String, NewPartitions> topicsToModify = checkPartitions(topicNameToTopic, topicInfo, topicsToAdd); 		if (topicsToAdd.size() > 0) { 			addTopics(adminClient, topicsToAdd); 		} 		if (topicsToModify.size() > 0) { 			modifyTopics(adminClient, topicsToModify); 		} 	} }
  | 
所以自动创建 Topic 的事情,Spring 封装的 Kafka 工具里面已经帮我们做了,我们只需要把 NewTopic bean 注册到容器中即可。