SpringBoot自动创建Kafka Topic

并做到分区、副本数可配置

一、操作步骤

1.1 创建 Topic 信息配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Data
@Configuration
@ConfigurationProperties(prefix = "manage.server.kafka")
public class TopicConfig {

@Data
static class Topic {
/**
* topic名称
*/
String name;

/**
* 分区数 默认1
*/
Integer numPartitions = 1;

/**
* 副本数 默认1
*/
Short replicationFactor = 1;

NewTopic convertToNewTopic() {
return new NewTopic(this.name, this.numPartitions, this.replicationFactor);
}
}

@ApiModelProperty("topic列表")
private List<Topic> topics;

}

1.2 创建 Topic Bean 管理类

此类的作用就是把所有配置好的 Topic 信息注册成为 NewTopic Bean 到容器中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Component
@RequiredArgsConstructor
@Slf4j
public class TopicAdministrator {

private final TopicConfig topicConfig;

private final GenericWebApplicationContext applicationContext;

@PostConstruct
public void init() {
topicConfig.getTopics().forEach(topic -> {
applicationContext.registerBean(topic.name, NewTopic.class, topic::convertToNewTopic);
log.info("正在注册bean: {}", topic);
});
}

}

1.3 配置 Topic 信息

这样可以在配置文件中统一配置每个 Topic 的名称、分区数、副本数,项目启动后会自动创建这些 Topic

1
2
3
4
5
6
7
8
9
10
manage:
server:
kafka:
topics:
- name: test
num-partitions: 1
replication-factor: 1
- name: test01
num-partitions: 1
replication-factor: 1

二、原理

2.1 为什么注册 NewTopic Bean 就会自动创建 Topic ?

Spring 帮我们封装了一个 KafkaAdmin 工具:org.springframework.kafka.core.KafkaAdmin

这个类的 bean 实例化后会执行一个初始化方法:

可以看到它从容器中获取 NewTopic.class 的 bean,所以这里它能获取到我们配置好,注册到容器的 topic 配置信息

1
Collection<NewTopic> newTopics = this.applicationContext.getBeansOfType(NewTopic.class, false, false).values();

然后又把获取好的 topic 信息去进行创建操作 addTopicsIfNeeded 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void addTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics) {
if (topics.size() > 0) {
Map<String, NewTopic> topicNameToTopic = new HashMap<>();
topics.forEach(t -> topicNameToTopic.compute(t.name(), (k, v) -> t));
DescribeTopicsResult topicInfo = adminClient
.describeTopics(topics.stream()
.map(NewTopic::name)
.collect(Collectors.toList()));
List<NewTopic> topicsToAdd = new ArrayList<>();
Map<String, NewPartitions> topicsToModify = checkPartitions(topicNameToTopic, topicInfo, topicsToAdd);
if (topicsToAdd.size() > 0) {
addTopics(adminClient, topicsToAdd);
}
if (topicsToModify.size() > 0) {
modifyTopics(adminClient, topicsToModify);
}
}
}

所以自动创建 Topic 的事情,Spring 封装的 Kafka 工具里面已经帮我们做了,我们只需要把 NewTopic bean 注册到容器中即可。