RocketMQ源码9

您所在的位置:网站首页 rocketmq的commitlog RocketMQ源码9

RocketMQ源码9

#RocketMQ源码9| 来源: 网络整理| 查看: 265

1. 启动入口

broker的启动类为org.apache.rocketmq.broker.BrokerStartup,代码如下:

public class BrokerStartup { ... public static void main(String[] args) { start(createBrokerController(args)); } ... }

在main()方法中,仅有一行代码,这行代码包含了两个操作:

createBrokerController(...):创建BrokerControllerstart(...):启动Broker

接下来我们就来分析这两个操作。 2. 创建BrokerController 创建BrokerController的方法为BrokerStartup#createBrokerController,代码如下:

/** * 创建 broker 的配置参数 */ public static BrokerController createBrokerController(String[] args) { ... try { //解析命令行参数 Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); } // 处理配置 final BrokerConfig brokerConfig = new BrokerConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); final NettyClientConfig nettyClientConfig = new NettyClientConfig(); // tls安全相关 nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE, String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING)))); // 配置端口 nettyServerConfig.setListenPort(10911); // 消息存储的配置 final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); ... // 将命令行中的配置设置到brokerConfig对象中 MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig); // 检查环境变量:ROCKETMQ_HOME if (null == brokerConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } //省略一些配置 ... // 创建 brokerController final BrokerController controller = new BrokerController( brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig); controller.getConfiguration().registerConfig(properties); // 初始化 boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } // 关闭钩子,在关闭前处理一些操作 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { private volatile boolean hasShutdown = false; private AtomicInteger shutdownTimes = new AtomicInteger(0); @Override public void run() { synchronized (this) { if (!this.hasShutdown) { ... // 这里会发一条注销消息给nameServer controller.shutdown(); ... } } } }, "ShutdownHook")); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }

这个方法的代码有点长,但功能并不多,总的来说就三个功能:

处理配置:主要是处理nettyServerConfig与nettyClientConfig的配置,这块就是一些配置解析的操作,处理方式与NameServer很类似,这里就不多说了。创建及初始化controller:调用方法controller.initialize(),这块内容我们后面分析。注册关闭钩子:调用Runtime.getRuntime().addShutdownHook(...),可以在jvm进程关闭前进行一些操作。

2.1 controller实例化 BrokerController的创建及初始化是在BrokerStartup#createBrokerController方法中进行,我们先来看看它的构造方法:

public BrokerController( final BrokerConfig brokerConfig, final NettyServerConfig nettyServerConfig, final NettyClientConfig nettyClientConfig, final MessageStoreConfig messageStoreConfig ) { // 4个核心配置信息 this.brokerConfig = brokerConfig; this.nettyServerConfig = nettyServerConfig; this.nettyClientConfig = nettyClientConfig; this.messageStoreConfig = messageStoreConfig; // 管理consumer消费消息的offset this.consumerOffsetManager = new ConsumerOffsetManager(this); // 管理topic配置 this.topicConfigManager = new TopicConfigManager(this); // 处理 consumer 拉消息请求的 this.pullMessageProcessor = new PullMessageProcessor(this); this.pullRequestHoldService = new PullRequestHoldService(this); // 消息送达的监听器 this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService); ... // 往外发消息的组件 this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig); ... }

BrokerController的构造方法很长,基本都是一些赋值操作,代码中已列出关键项,这些包括:

核心配置赋值:主要是brokerConfig/nettyServerConfig/nettyClientConfig/messageStoreConfig四个配置ConsumerOffsetManager:管理consumer消费消息位置的偏移量,偏移量表示消费者组消费该topic消息的位置,后面再消费时,就从该位置后消费,避免重复消费消息,也避免了漏消费消息。topicConfigManager:topic配置管理器,就是用来管理topic配置的,如topic名称,topic队列数量pullMessageProcessor:消息处理器,用来处理消费者拉消息messageArrivingListener:消息送达的监听器,当生产者的消息送达时,由该监听器监听brokerOuterAPI:往外发消息的组件,如向NameServer发送注册/注销消息

以上这些组件的用处,这里先混个脸熟,我们后面再分析。 2.2 初始化controller 我们再来看看初始化操作,方法为BrokerController#initialize:

public boolean initialize() throws CloneNotSupportedException { // 加载配置文件中的配置 boolean result = this.topicConfigManager.load(); result = result && this.consumerOffsetManager.load(); result = result && this.subscriptionGroupManager.load(); result = result && this.consumerFilterManager.load(); if (result) { try { // 消息存储管理组件,管理磁盘上的消息 this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig); // 启用了DLeger,就创建DLeger相关组件 if (messageStoreConfig.isEnableDLegerCommitLog()) { ... } // broker统计组件 this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); //load plugin MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); this.messageStore = MessageStoreFactory.build(context, this.messageStore); this.messageStore.getDispatcherList().addFirst( new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager)); } catch (IOException e) { result = false; log.error("Failed to initialize", e); } } // 加载磁盘上的记录,如commitLog写入的位置、消费者主题/队列的信息 result = result && this.messageStore.load(); if (result) { // 处理 nettyServer this.remotingServer = new NettyRemotingServer( this.nettyServerConfig, this.clientHousekeepingService); NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone(); fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); this.fastRemotingServer = new NettyRemotingServer( fastConfig, this.clientHousekeepingService); // 创建线程池start... 这里会创建多种类型的线程池 ... // 处理consumer pull操作的线程池 this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getPullMessageThreadPoolNums(), this.brokerConfig.getPullMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.pullThreadPoolQueue, new ThreadFactoryImpl("PullMessageThread_")); ... // 创建线程池end... // 注册处理器 this.registerProcessor(); // 启动定时任务start... 这里会启动好多的定时任务 ... // 定时将consumer消费到的offset进行持久化操作,即将数据保存到磁盘上 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerOffsetManager.persist(); } catch (Throwable e) { log.error("schedule persist consumerOffset error.", e); } } }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS); ... // 启动定时任务end... ... // 开启 DLeger 的一些操作 if (!messageStoreConfig.isEnableDLegerCommitLog()) { ... } // 处理tls配置 if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { ... } // 初始化一些操作 initialTransaction(); initialAcl(); initialRpcHooks(); } return result; }

这个还是很长,关键部分都做了注释,该方法所做的工作如下:

加载配置文件中的配置赋值与初始化操作创建线程池注册处理器启动定时任务

这里我们来看下注册处理器的操作this.registerProcessor(): 2.2.1 注册处理器:BrokerController#registerProcessor this.registerProcessor()实际调用的方法是BrokerController#registerProcessor,代码如下:

public void registerProcessor() { /** * SendMessageProcessor */ SendMessageProcessor sendProcessor = new SendMessageProcessor(this); sendProcessor.registerSendMessageHook(sendMessageHookList); sendProcessor.registerConsumeMessageHook(consumeMessageHookList); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); ... /** * PullMessageProcessor */ this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor); this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); /** * ReplyMessageProcessor */ ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this); replyMessageProcessor.registerSendMessageHook(sendMessageHookList); ... }

这个方法里注册了许许多多的处理器,这里仅列出了与消息相关的内容,如发送消息、回复消息、拉取消息等,后面在处理producer/consumer的消息时,就会用到这些处理器,这里先不展开分析。 2.2.2 remotingServer注册处理器:NettyRemotingServer#registerProcessor 我们来看下remotingServer注册处理器的操作,方法为NettyRemotingServer#registerProcessor:

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer { ... @Override public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) { ExecutorService executorThis = executor; if (null == executor) { executorThis = this.publicExecutor; } Pair pair = new Pair(processor, executorThis); // 注册到processorTable 中 this.processorTable.put(requestCode, pair); } ... }

最终,这些处理器注册到了processorTable中,它是NettyRemotingAbstract的成员变量,定义如下: HashMap 复制代码 这是一个hashMap的结构,key为code,value为Pair,该类中有两个成员变量:NettyRequestProcessor、ExecutorService,code与NettyRequestProcessor的映射关系就是在hashMap里存储的。 2.3 注册关闭钩子:Runtime.getRuntime().addShutdownHook(...) 接着我们来看看注册关闭钩子的操作:

// 关闭钩子,在关闭前处理一些操作 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { private volatile boolean hasShutdown = false; private AtomicInteger shutdownTimes = new AtomicInteger(0); @Override public void run() { synchronized (this) { if (!this.hasShutdown) { ... // 这里会发一条注销消息给nameServer controller.shutdown(); ... } } } }, "ShutdownHook"));

跟进BrokerController#shutdown方法:

public void shutdown() { // 调用各组件的shutdown方法 ... // 发送注销消息到NameServer this.unregisterBrokerAll(); ... // 持久化consumer的消费偏移量 this.consumerOffsetManager.persist(); // 又是调用各组件的shutdown方法 ...

这个方法里会调用各组件的shutdown()方法、发送注销消息给NameServer、持久化consumer的消费偏移量,这里我们主要看发送注销消息的方法BrokerController#unregisterBrokerAll:

private void unregisterBrokerAll() { // 发送一条注销消息给nameServer this.brokerOuterAPI.unregisterBrokerAll( this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId()); }

继续进入BrokerOuterAPI#unregisterBrokerAll:

public void unregisterBrokerAll( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId ) { // 获取所有的 nameServer,遍历发送注销消息 List nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null) { for (String namesrvAddr : nameServerAddressList) { try { this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId); log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr); } catch (Exception e) { log.warn("unregisterBroker Exception, {}", namesrvAddr, e); } } } }

这个方法里,会获取到所有的nameServer,然后逐个发送注销消息,继续进入BrokerOuterAPI#unregisterBroker方法:

public void unregisterBroker( final String namesrvAddr, final String clusterName, final String brokerAddr, final String brokerName, final long brokerId ) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); // 发送的注销消息:RequestCode.UNREGISTER_BROKER RemotingCommand request = RemotingCommand.createRequestCommand( c, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { return; } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr); }

最终调用的是RemotingClient#invokeSync进行消息发送,请求code是RequestCode.UNREGISTER_BROKER,这就与NameServer接收broker的注销消息对应上了。 3. 启动Broker:start(...) 我们再来看看Broker的启动流程,处理方法为BrokerController#start:

public void start() throws Exception { // 启动各组件 // 启动消息存储相关组件 if (this.messageStore != null) { this.messageStore.start(); } // 启动 remotingServer,其实就是启动一个netty服务,用来接收producer传来的消息 if (this.remotingServer != null) { this.remotingServer.start(); } ... // broker对外发放消息的组件,向nameServer上报存活消息时使用了它,也是一个netty服务 if (this.brokerOuterAPI != null) { this.brokerOuterAPI.start(); } ... // broker 核心的心跳注册任务 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } // brokerConfig.getRegisterNameServerPeriod() 值为 1000 * 30,最终计算得到默认30秒执行一次 }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); ... }

这个方法主要就是启动各组件了,这里列出了几大重要组件的启动:

messageStore:消息存储组件,在这个组件里,会启动消息存储相关的线程,如消息的投递操作、commitLog文件的flush操作、comsumeQueue文件的flush操作等remotingServer:netty服务,用来接收请求消息,如producer发送过来的消息brokerOuterAPI:也是一个netty服务,用来对外发送消息,如向nameServer上报心跳消息启动定时任务:broker向nameServer发送注册消息

这里我们重点来看定时任务是如何发送心跳发送的。 处理注册消息发送的时间间隔如下:

Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)

这行代码看着长,但意思就一句话:时间间隔可以自行配置,但不能小于10s,不能大于60s,默认是30s.

处理消息注册的方法为BrokerController#registerBrokerAll(...),代码如下:

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); // 处理topic相关配置 if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { ... } // 这里会判断是否需要进行注册 if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.brokerConfig.getRegisterBrokerTimeoutMills())) { // 进行注册操作 doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); } }

这个方法就是用来处理注册操作的,不过注册前会先验证下是否需要注册,验证是否需要注册的方法为BrokerController#needRegister, 代码如下:

private boolean needRegister(final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final int timeoutMills) { TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); // 判断是否需要进行注册 List changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills); // 有一个发生了变化,就表示需要注册了 boolean needRegister = false; for (Boolean changed : changeList) { if (changed) { needRegister = true; break; } } return needRegister; }

这个方法调用了brokerOuterAPI.needRegister(...)来判断broker是否发生了变化,只要一个NameServer上发生了变化,就说明需要进行注册操作。 brokerOuterAPI.needRegister(...)是如何判断broker是否发生了变化的呢?继续跟进BrokerOuterAPI#needRegister:

public List needRegister( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final TopicConfigSerializeWrapper topicConfigWrapper, final int timeoutMills) { final List changedList = new CopyOnWriteArrayList(); // 获取所有的 nameServer List nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); // 遍历所有的nameServer,逐一发送请求 for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader(); ... // 向nameServer发送消息,命令是 RequestCode.QUERY_DATA_VERSION RemotingCommand request = RemotingCommand .createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader); // 把当前的 DataVersion 发到 nameServer request.setBody(topicConfigWrapper.getDataVersion().encode()); // 发请求到nameServer RemotingCommand response = remotingClient .invokeSync(namesrvAddr, request, timeoutMills); DataVersion nameServerDataVersion = null; Boolean changed = false; switch (response.getCode()) { case ResponseCode.SUCCESS: { QueryDataVersionResponseHeader queryDataVersionResponseHeader = (QueryDataVersionResponseHeader) response .decodeCommandCustomHeader(QueryDataVersionResponseHeader.class); changed = queryDataVersionResponseHeader.getChanged(); byte[] body = response.getBody(); if (body != null) { // 拿到 DataVersion nameServerDataVersion = DataVersion.decode(body, D ataVersion.class); // 这里是判断的关键 if (!topicConfigWrapper.getDataVersion() .equals(nameServerDataVersion)) { changed = true; } } if (changed == null || changed) { changedList.add(Boolean.TRUE); } } default: break; } ... } catch (Exception e) { ... } finally { countDownLatch.countDown(); } } }); } try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("query dataversion from nameserver countDownLatch await Exception", e); } } return changedList; }

这个方法里,先是遍历所有的nameServer,向每个nameServer都发送一条code为RequestCode.QUERY_DATA_VERSION的参数,参数为当前broker的DataVersion,当nameServer收到消息后,就返回nameServer中保存的、与当前broker对应的DataVersion,当两者版本不相等时,就表明当前broker发生了变化,需要重新注册。 DataVersion是个啥呢?它的部分代码如下:

public class DataVersion extends RemotingSerializable { // 时间戳 private long timestamp = System.currentTimeMillis(); // 计数器,可以理解为最近的版本号 private AtomicLong counter = new AtomicLong(0); public void nextVersion() { this.timestamp = System.currentTimeMillis(); this.counter.incrementAndGet(); } /** * equals 方法,当 timestamp 与 counter 都相等时,则两者相等 */ @Override public boolean equals(final Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; final DataVersion that = (DataVersion) o; if (timestamp != that.timestamp) { return false; } if (counter != null && that.counter != null) { return counter.longValue() == that.counter.longValue(); } return (null == counter) && (null == that.counter); } ... }

从DataVersion的equals()方法来看,只有当timestamp与counter都相等时,两个DataVersion对象才相等。那这两个值会在哪里被修改呢?从DataVersion#nextVersion方法的调用情况来看,引起这两个值的变化主要有两种:

broker 上新创建了一个 topictopic的发了的变化

在这两种情况下,DataVersion#nextVersion方法被调用,从而引起DataVersion的改变。DataVersion改变了,就表明当前broker需要向nameServer注册了。 让我们再回到BrokerController#registerBrokerAll(...)方法:

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { ... // 这里会判断是否需要进行注册 if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.brokerConfig.getRegisterBrokerTimeoutMills())) { // 进行注册操作 doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); } }

处理注册的方法为BrokerController#doRegisterBrokerAll,稍微看下它的流程:

private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway, TopicConfigSerializeWrapper topicConfigWrapper) { // 注册 List registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll( this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.getHAServerAddr(), // 这个对象里就包含了当前broker的版本信息 topicConfigWrapper, this.filterServerManager.buildNewFilterServerList(), oneway, this.brokerConfig.getRegisterBrokerTimeoutMills(), this.brokerConfig.isCompressedRegister()); ... }

继续跟下去,最终调用的是BrokerOuterAPI#registerBroker方法:

private RegisterBrokerResult registerBroker( final String namesrvAddr, final boolean oneway, final int timeoutMills, final RegisterBrokerRequestHeader requestHeader, final byte[] body ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { // 构建请求 RemotingCommand request = RemotingCommand .createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); request.setBody(body); // 处理发送操作:sendOneWay if (oneway) { try { // 注册操作 this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills); } catch (RemotingTooMuchRequestException e) { // Ignore } return null; ... } .... }

所以,所谓的注册操作,就是当nameServer发送一条code为RequestCode.REGISTER_BROKER的消息,消息里会带上当前broker的topic信息、版本号等。 4.总结 本文主要分析了broker的启动流程,总的来说,启动流程分为3个:

解析配置文件,这一步会解析各种配置,并将其赋值到对应的对象中BrokerController创建及初始化:创建了BrokerController对象,并进行初始化操作,所谓的初始化,就是加载配置文件中配置、创建线程池、注册请求处理器、启动定时任务等BrokerController启动:这一步是启动broker的核心组件,如messageStore(消息存储)、remotingServer(netty服务,用来处理producer与consumer请求)、brokerOuterAPI(netty服务,用来向nameServer上报当前broker信息)等。

在分析启动过程中,重点分析了两类消息的发送:

在ShutdownHook中,broker会向nameServer发送注销消息,这表明在broker关闭前,nameServer会清除当前broker的注册信息 broker启动后,会启动一个定时任务,定期判断是否需要向nameServer注册,判断是否需要注册时,会向nameServer发送code为QUERY_DATA_VERSION的消息,从nameServer得到当前broker的版本号,该版本号与本地版本号不一致,就表示需要向broker重新注册了,即发送注册消息。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3