RocketMQ学习之安装部署及基础讲解 |
您所在的位置:网站首页 › rocketmq480 › RocketMQ学习之安装部署及基础讲解 |
一、rocketMQ简介
RocektMQ是阿里巴巴在2012年开源的一个纯java、分布式、队列模型的第三代消息中间件。 2016年11月11号,双十一大促见证了RocketMQ低延迟存储架构的成功试水,99.996%的延迟落在了10ms以内,极个别由于GC引发的停顿在50ms以内,其高性能、低延时和高可靠的特性承载了近年来双十一17万笔/秒的交易峰值,在整个生产链路上都有着稳定和出色的表现。其在同年捐赠给Apache后正式进入孵化期。并于2017年9月RocketMQ正式从Apache社区正式毕业,成为Apache顶级项目。 二、相关术语 1、消息模型(Message Model)RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。Consumer Group 由多个Consumer 实例构成。 2、消息生产者(Producer)Producer:消息生产者,负责产生消息,一般由业务系统负责产生消息。 Producer Group:一类Producer的集合名称,这类Producer通常发送一类消息,且发送逻辑一致。 一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。 3、消息消费者(Consumer)Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费。 Push Consumer:Consumer的一种,应用通常向Consumer对象注册一个Listener接口,一旦收到消息,Consumer对象立刻回调Listener接口方法。该消费模式一般实时性较高。 Pull Consumer:Consumer的一种,应用通常主动调用Consumer的拉消息方法从Broker拉消息,主动权由应用控制。 Consumer Group:一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。 4、代理服务器(Broker)消息中转角色,负责存储消息,转发消息,一般也称为Server。 Master:Broker中的主节点。 Slave:Broker中的副节点。 5、名字服务(Name Server)名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,集群中Nameserver互相独立,彼此没有通信关系,单台Nameserver挂掉,不影响其他Nameserver,即使全部挂掉,也不影响业务系统使用。而且Nameserver不会有频繁的读写,所以性能开销非常小,稳定性很高。 6、主题(Topic)表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。 7、消息(Message)消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。 8、消息队列(Message Queue)在RocketMQ中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset来访问,offset为java long类型,64位,理论上在100年内不会溢出,所以认为是长度无限,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来删除。也可以认为Message Queue是一个长度无限的数组,offset就是下标。 9、消费类型广播消费:一条消息被多个Consumer消费,即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每个Consumer都消费一次,广播消费中的Consumer Group概念可以认为在消息划分方面无意义。 集群消费:一个Consumer Group中的Consumer实例平均分摊消费消息。例如某个Topic有9条消息,其中一个Consumer Group有3个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的3条消息。 10、消息顺序普通顺序消息:消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。主要指的是局部顺序,即一类消息为满足顺序性,必须Producer单线程顺序发送,且发送到同一个队列,这样Consumer就可以按照Producer发送的顺序去消费消息。但是一旦发生通信异常,Broker重启,由于队列总数发生变化,哈希取模后定位的队列会变化,产生短暂的消息顺序不一致。 严格顺序消息:顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式Failover特性,即Broker集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不可用。 11、消息写入机制异步复制:消息写入master节点,再由master节点异步复制到slave节点,类似mysql中的master-slave机制。 同步双写:消息同时写入master节点和slave节点。 12、持久化策略异步刷盘:Broker的一种持久化策略,消息写入pagecache后,直接返回。由异步线程负责将pagecache写入硬盘。 同步刷盘:Broker的一种持久化策略,消息写入pagecache后,由同步线程将pagecache写入硬盘后,再返回。 三、RocketMQ集群部署模式RocketMQ作为消息中间件,其主要功能为消息的Publish/Subscribe。而Broker担任的消息转发和存储功能,其部署方式有很多种: 1、单Master优点:除了配置简单没什么优点。 缺点:不可靠,该机器重启或宕机,将导致整个服务不可用。 2、多Master优点:配置简单,性能最高。 缺点:可能会有少量消息丢失,单台机器重启或宕机期间,该机器下未被消费的消息在机器恢复前不可订阅,影响消息实时性。 3、异步多Master多Slave每个Master配一个Slave,有多对Master-Slave,集群采用异步复制方式,主备有短暂消息延迟,毫秒级。 优点:性能同多Master几乎一样,实时性高,主备间切换对应用透明,不需人工干预。 缺点:Master宕机或磁盘损坏时会有少量消息丢失。 4、同步多Master多Slave每个Master配一个Slave,有多对Master-Slave,集群采用同步双写方式,主备都写成功,向应用返回成功。 优点:服务可用性与数据可用性非常高。 缺点:性能比异步集群略低,当前版本主宕备不能自动切换为主。 四、RocketMQ架构首先我们来看看RocketMQ架构是怎样的: 接下来看看各个组件如何启动: 最后看看各个组件如何协作: 首先我们来尝试进行单机部署,本实例是在Mac环境下进行部署,Linux机器大致一致。 1.1、官方推荐环境 64bit OS, Linux/Unix/Mac is recommended;(Windows user see guide below) 64bit JDK 1.8+; Maven 3.2.x; Git; 4g+ free disk for Broker server 1.2、软件包获取软件包可以下载源文件进行编译然后安装,也可以直接下载二进制包进行解压安装,本文就以二进制包的安装为例,版本:rocketmq-all-4.7.0-bin-release.zip。 下载地址为:http://rocketmq.apache.org/docs/quick-start/ 中文的GitHub地址:https://github.com/apache/rocketmq/tree/master/docs/cn 1.3、JDK环境本文JDK版本选择 1.8: HoudeMacBook-Pro:~ ******$ java -version java version "1.8.0_131" Java(TM) SE Runtime Environment (build 1.8.0_131-b11) Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)注意选择新的版本启动可能出现以下问题: Java HotSpot(TM) 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release. Unrecognized VM option 'UseCMSCompactAtFullCollection' Error: Could not create the Java Virtual Machine. Error: A fatal exception has occurred. Program will exit. 1.4、rocketmq服务端安装解压安装包:rocketmq-all-4.7.0-bin-release.zip。 unzip rocketmq-all-4.7.0-bin-release.zip配置rocketmq的环境变量,在 ~/.bash_profile 最后加入: export ROCKETMQ_HOME=/Volumes/work/rocketmq-all-4.7.0-bin-release export PATH=$PATH::$ROCKETMQ_HOME/bin然后执行source命令使配置文件生效: source ~/.bash_profile给下列命令可运行权限: HoudeMacBook-Pro:~ houjing$ cd $ROCKETMQ_HOME/bin HoudeMacBook-Pro:bin houjing$ pwd /Volumes/work/rocketmq-all-4.7.0-bin-release/bin HoudeMacBook-Pro:bin houjing$ chmod +x mqadmin mqbroker mqfiltersrv mqshutdown mqnamesrv新建日志目录: HoudeMacBook-Pro:~ ******$ cd $ROCKETMQ_HOME HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release ******$ mkdir logs 1.5、启动namesever HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ nohup mqnamesrv 1>/Volumes/work/rocketmq-all-4.7.0-bin-release/logs/ng.log 2>/Volumes/work/rocketmq-all-4.7.0-bin-release/logs/ng-err.log & HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ ps -ef|grep rocket 501 3464 3224 0 8:54下午 ttys000 0:00.02 /bin/sh /Volumes/work/rocketmq-all-4.7.0-bin-release/bin/mqnamesrv 501 3465 3464 0 8:54下午 ttys000 0:00.01 sh /Volumes/work/rocketmq-all-4.7.0-bin-release/bin/runserver.sh org.apache.rocketmq.namesrv.NamesrvStartup 501 3495 3465 0 8:54下午 ttys000 0:02.88 /Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/bin/java -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC -verbose:gc -Xloggc:/Volumes/RAMDisk/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m -XX:-OmitStackTraceInFastThrow -XX:-UseLargePages -Djava.ext.dirs=/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext:/Volumes/work/rocketmq-all-4.7.0-bin-release/bin/../lib -cp .:/Volumes/work/rocketmq-all-4.7.0-bin-release/bin/../conf:/lib/tools.jar:/lib/dt.jar org.apache.rocketmq.namesrv.NamesrvStartup通过日志检查是否启动成功: HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ tail -f /Volumes/work/rocketmq-all-4.7.0-bin-release/logs/ng.log Create RAMDisk /Volumes/RAMDisk for gc logging on Darwin OS. The Name Server boot success. serializeType=JSON 1.6、启动broker启动broker,在启动borker之前须要指定nameserver地址。例如本地单机部署 127.0.0.1 为所在服务器IP: HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ export NAMESRV_ADDR=127.0.0.1:9876 HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ nohup mqbroker >/Volumes/work/rocketmq-all-4.7.0-bin-release/logs/mq.log & [2] 3584 HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ tail -f /Volumes/work/rocketmq-all-4.7.0-bin-release/logs/mq.log The broker[HoudeMacBook-Pro.local, 192.168.100.28:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876 1.7、模拟生产者生产 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ export NAMESRV_ADDR=127.0.0.1:9876 HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer 21:14:59.611 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. SendResult [sendStatus=SEND_OK, msgId=C0A8641C0E694D7E188675DB340D0000, offsetMsgId=C0A8641C00002A9F000000000002BEB2, messageQueue=MessageQueue [topic=TopicTest, brokerName=HoudeMacBook-Pro.local, queueId=1], queueOffset=250] SendResult [sendStatus=SEND_OK, msgId=C0A8641C0E694D7E188675DB34320001, offsetMsgId=C0A8641C00002A9F000000000002BF64, messageQueue=MessageQueue [topic=TopicTest, brokerName=HoudeMacBook-Pro.local, queueId=2], queueOffset=250] SendResult [sendStatus=SEND_OK, msgId=C0A8641C0E694D7E188675DB34350002, offsetMsgId=C0A8641C00002A9F000000000002C016, messageQueue=MessageQueue [topic=TopicTest, brokerName=HoudeMacBook-Pro.local, queueId=3], queueOffset=250] 1.8、模拟消费者进行消费 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer 21:16:28.569 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework Consumer Started. ConsumeMessageThread_4 Receive New Messages: [MessageExt [brokerName=HoudeMacBook-Pro.local, queueId=1, storeSize=178, queueOffset=251, sysFlag=0, bornTimestamp=1592918100028, bornHost=/192.168.100.28:58376, storeTimestamp=1592918100029, storeHost=/192.168.100.28:10911, msgId=C0A8641C00002A9F000000000002C17A, commitLogOffset=180602, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=500, CONSUME_START_TIME=1592918188998, UNIQ_KEY=C0A8641C0E694D7E188675DB343C0004, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='null'}]] ConsumeMessageThread_10 Receive New Messages: [MessageExt [brokerName=HoudeMacBook-Pro.local, queueId=1, storeSize=179, queueOffset=254, sysFlag=0, bornTimestamp=1592918100065, bornHost=/192.168.100.28:58376, storeTimestamp=1592918100066, storeHost=/192.168.100.28:10911, msgId=C0A8641C00002A9F000000000002C9D8, commitLogOffset=182744, bodyCRC=1659149091, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=500, CONSUME_START_TIME=1592918188998, UNIQ_KEY=C0A8641C0E694D7E188675DB34600010, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 54], transactionId='null'}]] 1.9、关闭RockectMQ mqshutdown namesrv #关闭nameserver mqshutdown broker #关闭broker HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ mqshutdown namesrv The mqnamesrv(3582) is running... Send shutdown request to mqnamesrv(3582) OK HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ mqshutdown broker The mqbroker(3589) is running... Send shutdown request to mqbroker(3589) OK 2、集群部署RocketMQ的多Master多Slave模式有两种:异步复制和同步双写。 异步复制:每个 Master 配置一个 Slave,有多对Master-Slave, HA采用异步复制方式,主备有短暂消息延迟,毫秒级。 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为Master 宕机后,消费者仍然可以从 Slave消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。 缺点: Master 宕机,磁盘损坏情况,会丢失少量消息。 同步双写:每个 Master 配置一个 Slave,有多对Master-Slave, HA采用同步双写方式,主备都写成功,向应用返回成功。 优点:数据与服务都无单点, Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高。 缺点:性能比异步复制模式略低,大约低10%左右,发送单个消息的 RT会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能。 我们首先来部署一个异步复制的RocketMQ,在RocketMQ路径下有两个文件夹: drwxr-xr-x 6 houjing admin 192 3 4 09:59 2m-2s-async/ drwxr-xr-x 6 houjing admin 192 3 4 09:59 2m-2s-sync/分别用于异步复制和同步双写配置,我们进入到目录2m-2s-sync,可以看到4个broker文件,表示2Master2Slave的RocketMQ集群。 -rw-r--r-- 1 houjing admin 922 3 4 09:59 broker-a-s.properties -rw-r--r-- 1 houjing admin 928 3 4 09:59 broker-a.properties -rw-r--r-- 1 houjing admin 922 3 4 09:59 broker-b-s.properties -rw-r--r-- 1 houjing admin 928 3 4 09:59 broker-b.properties 192:2m-2s-sync houjing$ pwd /Volumes/work/rocketmq-all-4.7.0-bin-release/conf/2m-2s-sync我们依次对4个properties进行配置配置: #mq-master01节点配置/data/rocketmq/conf/2m-2s-sync/broker-a.properties [root@mq-master01 ~]# vim /data/rocketmq/conf/2m-2s-sync/broker-a.properties #所属集群名字 brokerClusterName=rocketmq-cluster #broker名字,注意此处不同的配置文件填写的不一样 例如:在a.properties 文件中写 broker-a 在b.properties 文件中写 broker-b brokerName=broker-a #0 表示 Master,>0 表示 Slave brokerId=0 #nameServer地址,这里nameserver是单台,如果nameserver是多台集群的话,就用分号分割(即namesrvAddr=ip1:port1;ip2:port2;ip3:port3) namesrvAddr=192.168.10.207:9876; #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数。由于是4个broker节点,所以设置为4 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/data/rocketmq/store #commitLog 存储路径 storePathCommitLog=/data/rocketmq/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/data/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex=/data/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint=/data/rocketmq/store/checkpoint #abort 文件存储路径 abortFile=/data/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=MASTER #要配置为MASTER或SLAVE的角色 #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=SYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128 #mq-master02节点配置的是/data/rocketmq/conf/2m-2s-sync/broker-b.properties #就下面三行配置不一样,其他配置行都一样! [root@mq-master02 software]# vim /data/rocketmq/conf/2m-2s-sync/broker-b.properties ...... brokerName=broker-b brokerId=0 brokerRole=MASTER #mq-slave01节点配置的是/data/rocketmq/conf/2m-2s-sync/broker-a-s.properties [root@mq-slave01 software]# vim /data/rocketmq/conf/2m-2s-sync/broker-a-s.properties ...... brokerName=broker-a #注意这一行的名称要和master保持一致 brokerId=1 #这个ID要跟master的不一致! brokerRole=SLAVE #要配置为从 #mq-slave02节点配置的是/data/rocketmq/conf/2m-2s-sync/broker-b-s.properties [root@mq-slave02 software]# vim /data/rocketmq/conf/2m-2s-sync/broker-b-s.properties ...... brokerName=broker-b #注意这一行的名称要和master的保持一致 brokerId=1 #这个ID要跟master的不一致 brokerRole=SLAVE #要配置为从注意要创建存储&日志文件,如果一台虚拟机部署多个需要用文件进行区分。 修改日志配置文件,如: mkdir -p /root/svr/rocketmq/logs cd /root/svr/rocketmq/conf && sed -i 's#${user.home}#/root/svr/rocketmq#g'*.xml注意logback.*.xml配置文件中${user.home}需要替换自己指定的目录 同时要进行默认参数修改,runbroker.sh,runserver.sh启动参数默认对jvm的堆内存设置比较大(不改启动不起来),如果是虚拟机非线上环境需要改下参数,大小可以根据自己机器来决定默认大小: -Xms8g -Xmx8g -Xmn4g 改为: JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"配置完成后跟单机部署一样先启动NameServer,然后启动Broker即可。附RocketMQ常用命令: 需要切换到bin目录下,即: [root@mq-master01 ~]# cd /data/rocketmq/bin [root@mq-master01 bin]# 获取所有可用命令: [root@mq-master01 bin]# sh mqadmin 查看帮助: # sh mqadmin -h 查询Producer的网络连接情况: # sh mqadmin producerConnection -n localhost:9876 -g -t 查询Consumer的网络连接情况: # sh mqadmin consumerConnection -n localhost:9876 -g 查询Consumer的消费状态: # sh mqadmin consumerProgress -n localhost:9876 -g 查询消息是否发送成功 获取指定Topic: # sh mqadmin topicList -n localhost:9876 | grep 查看Topic状态: # sh mqadmin topicStatus -n localhost:9876 -t 根据offset获取消息: # sh sh mqadmin queryMsgByOffset -n localhost:9876 -b -i -o -t 根据offsetMsgId查询消息: # sh sh mqadmin queryMsgById -n localhost:9876 -i 查询消息是否被消费成功 查询消息详情: # sh mqadmin queryMsgById -i {MsgId} -n {NameServerAddr} 查看Consumer Group订阅了哪些TOPIC: # sh mqadmin consumerProgress -g -n 查询TOPIC被哪些Consumer Group订阅了 没有查询特定TOPIC订阅情况,只能查询所有后再过滤: # sh mqadmin statsAll -n | grep 返回结果:#Topic #Consumer Group #InTPS #OutTPS #InMsg24Hour #OutMsg24Hour 关闭nameserver和所有的broker: # sh mqshutdown namesrv # sh mqshutdown broker 查看所有消费组group: # sh mqadmin consumerProgress -n 192.168.23.159:9876 查看指定消费组(kevinGroupConsumer)下的所有topic数据堆积情况: # sh mqadmin consumerProgress -n 192.168.23.159:9876 -g kevinGroupConsumer 查看所有topic : # sh mqadmin topicList -n 192.168.23.159:9876 查看topic信息列表详情统计 # sh mqadmin topicstatus -n 192.168.23.159:9876 -t myTopicTest1 新增topic # sh mqadmin updateTopic –n 10.45.47.168 –c DefaultCluster –t ZTEExample 删除topic # sh mqadmin deleteTopic –n 10.45.47.168:9876 –c DefaultCluster –t ZTEExample 五、RocketMQ Console安装部署 1、rocketmq-console介绍RocketMQ-Console是RocketMQ项目的扩展插件,是一个图形化管理控制台,提供Broker集群状态查看,Topic管理,Producer、Consumer状态展示,消息查询等常用功能,这个功能在安装好RocketMQ后需要额外单独安装、运行。 2、rocketmq-console下载、部署进入rocketmq-externals项目的GitHub地址,如下图,可看到RocketMQ项目的诸多扩展项目,其中就包含我们需要下载的rocketmq-console。 中文注释是为了方便解释,请删除,不然打包报错:Not allow chinese character ! // 进入目录 $ cd rocketmq-externals/rocketmq-console/ // 修改maven项目的资源文件 $ vim src/main/resources/application.properties #管理后台访问上下文路径,默认为空,如果填写,一定要前面加“/”,后面不要加,否则启动报错 server.contextPath=/rocketmq #访问端口 server.port=8080 ### SSL setting 默认就行 #server.ssl.key-store=classpath:rmqcngkeystore.jks #server.ssl.key-store-password=rocketmq #server.ssl.keyStoreType=PKCS12 #server.ssl.keyAlias=rmqcngkey #spring.application.index=true spring.application.name=rocketmq-console spring.http.encoding.charset=UTF-8 spring.http.encoding.enabled=true spring.http.encoding.force=true #logback配置文件路径,先默认即可 logging.config=classpath:logback.xml #if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876 #Name Server地址,修改成你自己的服务地址。多个地址用英文分号“;”隔开 rocketmq.config.namesrvAddr=localhost:9876 #if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true rocketmq.config.isVIPChannel= #rocketmq-console's data path:dashboard/monitor rocketmq.config.dataPath=/tmp/rocketmq-console/data #set it false if you don't want use dashboard.default true rocketmq.config.enableDashBoardCollect=true #set the message track trace topic if you don't want use the default one rocketmq.config.msgTrackTopicName= rocketmq.config.ticketKey=ticket #Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required rocketmq.config.loginRequired=false注意: Name Server地址默认为空,可以在启动项目后在后台配置;或者启动服务的时候给出rocketmq.config.namesrvAddr参数值。 4、将项目打成jar包,并运行jar文件 $ mvn clean package -Dmaven.test.skip=true $ java -jar target/rocketmq-console-ng-1.0.0.jar #如果配置文件没有填写Name Server的话,可以在启动项目时指定namesrvAddr $ java -jar target/rocketmq-console-ng-1.0.0.jar --rocketmq.config.namesrvAddr='localhost:9876' #因为本文在打包时配置了namesrvAddr,故而执行如下命令 $ java -jar target/rocketmq-console-ng-1.0.0.jar 5、IDEA上直接运行Console我们也可以直接在IntelliJ上启动Console,console插件本身就是一个项目,可以直接在IDEA导入项目并运行,同上进行application.properties修改,然后找到app启动类启动,如下图所示: 浏览器访问如下: 如上图所示,DashBoard展示了一些Broker和Topic的基本信息。 1、NameSrvAdd配置 NameServer除了可以在首次启动时在Application.properties中指定,还可以在Console启动后进行配置。 2、集群Cluster 主题可在此处进行增删查改,其中TopicTest是我们RocketMQ的默认主题。 我们可以进行手动添加: RocketMQ自带的默认Topic:TopicTest,默认有四个队列。 3.3、发送消息 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |