RocketMQ消息类型及实现

您所在的位置:网站首页 rocketmq消息清理机制 RocketMQ消息类型及实现

RocketMQ消息类型及实现

2022-05-14 11:19| 来源: 网络整理| 查看: 265

订阅机制和定时消息发布订阅的基本概念

image image pull的方式,需要重写偏移量

package com.study.rocketmq.a151_simple; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 普通消息消费者 */ public class PullConsumer { public static final String NAME_SERVER_ADDR = "192.168.100.242:9876"; public static void main(String[] args) throws Exception { // 1. 创建消费者(Pull)对象 DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("GROUP_TEST"); // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步 consumer.setNamesrvAddr(NAME_SERVER_ADDR); consumer.start(); // 3. 获取到对于topic的queue列表 Set messageQueues = consumer.fetchSubscribeMessageQueues("TopicTest"); Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { try { // 4. 循环遍历 for (MessageQueue messageQueue : messageQueues) { // 5. 获取读取位置 long offset = consumer.fetchConsumeOffset(messageQueue, true); // 6. 从指定位置取queue中的消息,每次最多10条。 如果没有则阻塞等待 PullResult pullResult = consumer.pullBlockIfNotFound(messageQueue, null, offset, 10); // 7. 存储Offset,客户端每隔5s会定时刷新到Broker() System.out.println(pullResult.getNextBeginOffset()); consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset()); // 8. 遍历结果 if (pullResult.getPullStatus() == PullStatus.FOUND) { List messageExtList = pullResult.getMsgFoundList(); for (MessageExt messageExt : messageExtList) { System.out.printf("线程:%-25s 接收到新消息 %s --- %s %n", Thread.currentThread().getName(), messageExt.getTags(), new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET)); } } } } catch (Exception e) { e.printStackTrace(); } }, 1000L, 1000L, TimeUnit.MILLISECONDS); } }

push的方式

package com.study.rocketmq.a151_simple; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.io.UnsupportedEncodingException; import java.util.List; /** * 普通消息消费者 */ public class Consumer { public static final String NAME_SERVER_ADDR = "172.16.20.246:9876"; public static void main(String[] args) throws MQClientException { // 1. 创建消费者(Push)对象 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_TEST"); // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步 consumer.setNamesrvAddr(NAME_SERVER_ADDR); consumer.setMaxReconsumeTimes(-1);// 消费重试次数 -1代表16次 // 3. 订阅对应的主题和Tag consumer.subscribe("TopicTest", "*"); // 4. 注册消息接收到Broker消息后的处理接口 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { MessageExt messageExt = list.get(0); System.out.printf("线程:%-25s 接收到新消息 %s --- %s %n", Thread.currentThread().getName(), messageExt.getTags(), new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); // 5. 启动消费者(必须在注册完消息监听器后启动,否则会报错) consumer.start(); System.out.println("已启动消费者"); } } RocketMQ订阅模式的实现

image

package com.study.rocketmq.a154_broadcast; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; /** * 发布订阅消息生产者 */ public class BroadcastProducer { public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException, UnsupportedEncodingException { // 1. 创建生产者对象 DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST"); // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步 producer.setNamesrvAddr("192.168.100.242:9876"); // 3. 启动生产者 producer.start(); // 4. 生产者发送消息 for (int i = 0; i < 10; i++) { Message message = new Message("TopicTest", "TagA", "OrderID_" + i, ("Hello Broadcast:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult result = producer.send(message); System.out.printf("发送结果:%s%n", result); } // 5. 停止生产者 producer.shutdown(); } } package com.study.rocketmq.a154_broadcast; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.io.UnsupportedEncodingException; import java.util.List; /** * 发布订阅消息消费者 */ public class BroadcastConsumer { public static void main(String[] args) throws MQClientException { // 1. 创建消费者(Push)对象 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_TEST"); // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步 consumer.setNamesrvAddr("192.168.100.242:9876"); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); /** * 3. 设置消息模式,默认是CLUSTERING * MessageModel.BROADCASTING 广播消费模式 * MessageModel.CLUSTERING 集群消费模式 */ consumer.setMessageModel(MessageModel.BROADCASTING); // 4. 订阅对应的主题和Tag consumer.subscribe("TopicTest", "TagA || TagB || TagC"); // 5. 注册消息接收到Broker消息后的处理接口 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt messageExt = list.get(0); try { System.out.printf("线程:%-25s 接收到新消息 %s --- %s %n", Thread.currentThread().getName(), messageExt.getTags(), new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 6. 启动消费者(必须在注册完消息监听器后启动,否则会报错) consumer.start(); System.out.println("已启动消费者"); } }

BroadcastConsumer 需要最少启动两个才能更好的看到效果

MessageModel.BROADCASTING 广播消费模式,所有订阅了同一个主题的消费者都会收到相同的消息 MessageModel.CLUSTERING 集群消费模式,此集群不是指集群环境中的集群,但概念相同,即可实现消息的负载均衡。当有多个消费者订阅了相同的主题时,同一条消息只会有一个消费者消费 例如:某个Topic有9条消息,有3个消费者,广播模式就是每个消费者都收到9条消息,集群模式就是消费者平均分摊9条消息

定时消息的基本概念

image

package com.study.rocketmq.a155_schedule; import org.apache.commons.lang3.RandomUtils; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.TimeUnit; /** * 定时消息生产者 */ public class ScheduledMessageProducer { public static final String NAME_SERVER_ADDR = "192.168.100.242:9876"; public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException, UnsupportedEncodingException { // 1. 创建生产者对象 DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST"); // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步 producer.setNamesrvAddr(NAME_SERVER_ADDR); // 3. 启动生产者 producer.start(); for (int i = 0; i < 10; i++) { String content = "Hello scheduled message " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS").format(new Date()); Message message = new Message("TopicTest", content.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 4. 设置延时等级,此消息将在10秒后传递给消费者 // 可以在broker服务器端自行配置messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h message.setDelayTimeLevel(3); // 5. 发送消息 SendResult result = producer.send(message); System.out.printf("发送结果:%s%n", result); TimeUnit.MILLISECONDS.sleep(RandomUtils.nextInt(300, 800)); } // 6. 停止生产者 producer.shutdown(); } } package com.study.rocketmq.a155_schedule; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.io.UnsupportedEncodingException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; /** * 定时消息消费者 */ public class ScheduledMessageConsumer { public static final String NAME_SERVER_ADDR = "192.168.100.242:9876"; public static void main(String[] args) throws MQClientException { // 1. 创建消费者(Push)对象 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_TEST"); // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步 consumer.setNamesrvAddr(NAME_SERVER_ADDR); // 3. 订阅对应的主题和Tag consumer.subscribe("TopicTest", "*"); // 4. 注册消息接收到Broker消息后的处理接口 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt messageExt = list.get(0); try { System.out.printf("%-25s 接收到新消息 --- %s %n", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS").format(new Date()), new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 5. 启动消费者(必须在注册完消息监听器后启动,否则会报错) consumer.start(); System.out.println("已启动消费者"); } }

可以在broker服务器端自行配置messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h image

批量消息和事务消息批量消息

image

package com.study.rocketmq.a156_batch; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.List; /** * 批量消息生产者 */ public class BatchMessageProducer { public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException, UnsupportedEncodingException { // 1. 创建生产者对象 DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST"); // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步 producer.setNamesrvAddr("192.168.100.242:9876"); // 3. 启动生产者 producer.start(); List messages = new ArrayList(); for (int i = 0; i < 32; i++) { String content = "Hello batch message " + i; Message message = new Message("TopicTest", content.getBytes(RemotingHelper.DEFAULT_CHARSET)); messages.add(message); } // 5. 发送消息 SendResult result = producer.send(messages); System.out.println("消息已发送:" + result); // 6. 停止生产者 producer.shutdown(); } } package com.study.rocketmq.a156_batch; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.io.UnsupportedEncodingException; import java.util.List; /** * 批量消息消费者 */ public class BatchMessageConsumer { public static void main(String[] args) throws MQClientException { // 1. 创建消费者(Push)对象 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_TEST"); // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步 consumer.setNamesrvAddr("192.168.100.242:9876"); // 3. 订阅对应的主题和Tag consumer.subscribe("TopicTest", "*"); // 4. 设置消息批处理数量,即每次最多获取多少消息,默认是1 consumer.setConsumeMessageBatchMaxSize(10); // 5. 注册接收到Broker消息后的处理接口 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { // 设置消息批处理数量后,list中才会有多条,否则每次只会有一条 for (MessageExt messageExt : list) { System.out.printf("线程:%-25s 接收到新消息 --- %s %n", Thread.currentThread().getName(), new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET)); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 6. 启动消费者(必须在注册完消息监听器后启动,否则会报错) consumer.start(); System.out.println("已启动消费者"); } }

image 一个topic默认有4个queue,因此会创建4个线程。 每个都会消费10条数据 image

事务消息

image image 事务消息代码

package com.study.rocketmq.a157_transaction; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.io.UnsupportedEncodingException; import java.util.List; /** * 事务消息消费者 */ public class TransactionMessageConsumer { public static void main(String[] args) throws MQClientException { // 1. 创建消费者(Push)对象 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_TEST"); // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步 consumer.setNamesrvAddr("192.168.100.242:9876"); // 3. 订阅对应的主题和Tag consumer.subscribe("TopicTest", "*"); // 5. 注册接收到Broker消息后的处理接口 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { MessageExt messageExt = list.get(0); System.out.printf("线程:%-25s 接收到新消息 %s --- %s %n", Thread.currentThread().getName(), messageExt.getTags(), new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 6. 启动消费者(必须在注册完消息监听器后启动,否则会报错) consumer.start(); System.out.println("已启动消费者"); } } package com.study.rocketmq.a157_transaction; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.io.IOException; /** * 事务消息生产者 */ public class TransactionMessageProducer { /** * 事务消息监听实现 */ private final static TransactionListener transactionListenerImpl = new TransactionListener() { /** * 在发送消息成功时执行本地事务 * @param msg * @param arg producer.sendMessageInTransaction的第二个参数 * @return 返回事务状态 * LocalTransactionState.COMMIT_MESSAGE:提交事务,提交后broker才允许消费者使用 * LocalTransactionState.RollbackTransaction:回滚事务,回滚后消息将被删除,并且不允许别消费 * LocalTransactionState.Unknown:中间状态,表示MQ需要核对,以确定状态 */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { // TODO 开启本地事务(实际就是我们的jdbc操作) // TODO 执行业务代码(插入订单数据库表) // int i = orderDatabaseService.insert(....) // TODO 提交或回滚本地事务(如果用spring事务注解,这些都不需要我们手工去操作) // 模拟一个处理结果 int index = 8; /** * 模拟返回事务状态 */ switch (index) { case 3: System.out.printf("本地事务回滚,回滚消息,id:%s%n", msg.getKeys()); return LocalTransactionState.ROLLBACK_MESSAGE; case 5: case 8: return LocalTransactionState.UNKNOW; default: System.out.println("事务提交,消息正常处理"); return LocalTransactionState.COMMIT_MESSAGE; } } /** * Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer), * 由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback * @param msg * @return 返回事务状态 */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 根据业务,正确处理: 订单场景,只要数据库有了这条记录,消息应该被commit String transactionId = msg.getTransactionId(); String key = msg.getKeys(); System.out.printf("回查事务状态 key:%-5s msgId:%-10s transactionId:%-10s %n", key, msg.getMsgId(), transactionId); if ("id_5".equals(key)) { // 刚刚测试的10条消息, 把id_5这条消息提交,其他的全部回滚。 System.out.printf("回查到本地事务已提交,提交消息,id:%s%n", msg.getKeys()); return LocalTransactionState.COMMIT_MESSAGE; } else { System.out.printf("未查到本地事务状态,回滚消息,id:%s%n", msg.getKeys()); return LocalTransactionState.ROLLBACK_MESSAGE; } } }; public static void main(String[] args) throws MQClientException, IOException { // 1. 创建事务生产者对象 // 和普通消息生产者有所区别,这里使用的是TransactionMQProducer TransactionMQProducer producer = new TransactionMQProducer("GROUP_TEST"); // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步 producer.setNamesrvAddr("192.168.100.242:9876"); // 3. 设置事务监听器 producer.setTransactionListener(transactionListenerImpl); // 4. 启动生产者 producer.start(); for (int i = 0; i < 10; i++) { String content = "Hello transaction message " + i; Message message = new Message("TopicTest", "TagA", "id_" + i, content.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 5. 发送消息(发送一条新订单生成的通知) SendResult result = producer.sendMessageInTransaction(message, i); System.out.printf("发送结果:%s%n", result); } System.in.read(); // 6. 停止生产者 producer.shutdown(); } }

image image image



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3