差不多可以看消息队列的源码了。 在下从gitee上找到了rocketmq的早期版本(3.2.2), 坏消息是这个2014年的项目里没有单元测试极少, 调试会比较困难. 好消息是这个时候的RocketMQ还没开源多久,里面有很多中文注释。看起来会很舒服。
我们从Broker开始涂鸦。关于RocketMQ中每个角色的作用这里不再陈述:
先从初始化开始:
1 public static void main(String[] args) {
2 start(createBrokerController(args));
3 }
rocketmq是从commandline启动的,createBrokerController函数比较长,
会有很多额外的逻辑干扰你,我这里直接说重点:
- 读取环境变量,没有就用默认值。
- 解析命令行参数。
- 初始化配置类。
- 打印默认配置内容。
- 检查NameServer地址设置是否正确。
- 检查broker的类型(master,slave)
- 初始化日志配置类。
- 再次打印。
- 初始化服务控制对象.
- 最后增加一个关闭Broker时触发的hook.
服务控制对象: Broker各个服务控制器,包括存储层配置,配置文件版本号,消费进度存储,Consumer连接、订阅关系管理等等。
以上就是createBrokerController的内容,函数虽然长,但是并不复杂。
下面为start函数的内容, 在main中的start函数实际上是去委托了BrokerController去执行.
1 public void start() throws Exception {
2
3 // 启动Broker的各层服务
4
5 if (this.messageStore != null) {
6 this.messageStore.start();
7 }
8
9 if (this.remotingServer != null) {
10 this.remotingServer.start();
11 }
12
13 if (this.brokerOuterAPI != null) {
14 this.brokerOuterAPI.start();
15 }
16
17 if (this.pullRequestHoldService != null) {
18 this.pullRequestHoldService.start();
19 }
20
21 if (this.clientHousekeepingService != null) {
22 this.clientHousekeepingService.start();
23 }
24
25 if (this.filterServerManager != null) {
26 this.filterServerManager.start();
27 }
28
29 // 启动时,注册该Broker的信息到所有的NameServer
30 this.registerBrokerAll(true);
31
32 // 定时注册Broker到Name Server
33 this.scheduledExecutorService.scheduleAtFixedRate(() -> {
34 try {
35 this.registerBrokerAll(true);
36 } catch (Exception e) {
37 log.error("registerBrokerAll Exception", e);
38 }
39 }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
40
41 if (this.brokerStatsManager != null) {
42 // 看起来就是一些数据统计线程
43 this.brokerStatsManager.start();
44 }
45
46 // 删除多余的Topic
47 this.addDeleteTopicTask();
48 }
整个Borker的流程差不多就是这样.代码里并没有什么亮点说实话.