RabbitMQ安装及客户端使用

您所在的位置:网站首页 mongodump命令要装客户端吗 RabbitMQ安装及客户端使用

RabbitMQ安装及客户端使用

2024-07-09 18:14| 来源: 网络整理| 查看: 265

less /var/log/rabbitmq/[email protected]

/etc/rabbitmq 默认配置情况下没有配置文件。 官网说明连接: File and Directory Locations — RabbitMQ

基本配置

RabbitMQ有一套默认的配置,能够满足日常开发需求,如果需要修改,需要自己建立个配置文件

也就是说rabbitmq是不需要我们修改配置的。

一些常使用命令

查看rabbitmq状态   systemctl status rabbitmq

 whereis rabbitmq 查看rabbitmq的目录

包括一些存放日志的地方

 通过less [email protected] 可以查询日志的目录等等

如要修改配置,创建配置文件: /etc/rabbitmq/rabbitmq.conf 配置文件示例: https://github.com/rabbitmq/rabbitmq-server/blob/master/docs/rabbitmq.conf.example 配置项说明: https://www.rabbitmq.com/confifigure.html#confifig-items Rabbit端口

rabbitMQ会绑定一些端口,安装完后,需要将这些端口添加至防火墙。

15674端口:基于websocket的stomp客户端端口 当插件web stomp启用的时候打开

15675端口:基于websocket的mqtt客户端 端口 当web mqtt打开的时候

在bin中能看到下面的命令

 rabbitmqctl(8) — RabbitMQ

文档上面包括所有命令

RabbitMQ管理控制台

在rabbitmq中有管理插件,需要手动激活

rabbitmq-plugins enable rabbitmq_management RabbitMQ 有一个默认的用户 “guest” ,但这个用户默认只能通过本机访问,要让其它机器可以访问,需 要创建一个新用户,为其分配权限。 #添加用户 rabbitmqctl add_user admin admin #为用户分配管理权限 rabbitmqctl set_user_tags admin administrator #为用户分配资源权限 https://www.rabbitmq.com/rabbitmqctl.8.html#set_permissions rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" RabbitMQ 的用户角色分类 none 不能访问 management plugin management 用户可以通过 AMQP 做的任何事,外加: 列出自己可以通过AMQP登入的virtual hosts 查看自己的virtual hosts中的queues, exchanges 和 bindings 查看和关闭自己的channels 和 connections 查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动。

policymaker

management 可以做的任何事外加: 查看、创建和删除自己的virtual hosts所属的policies和parameters monitoring management 可以做的任何事外加: 列出所有virtual hosts,包括他们不能登录的virtual hosts 查看其他用户的connections和channels 查看节点级别的数据如clustering和memory使用情况 查看真正的关于所有virtual hosts的全局的统计信息

administrator policymaker 和 monitoring 可以做的任何事外加: 创建和删除virtual hosts 查看、创建和删除users 查看创建和删除permissions 关闭其他用户的connections 访问web管理控制台

http://ip:15672

登录进去包括端口等等,连接 节点名称。 开的通道以及  通道的各种情况

虚拟主机

客户端使用

RabbitMQ Tutorials — RabbitMQ

提供给例子进行使用和操作

 客户端依赖

com.rabbitmq amqp-client 5.7.3

使用场景用队列进行交互

利用队列进行交互

这里创建一个 生产者、以及消费者 直接利用ConnectionFactory ,并创建Channel 通道

创建连接工厂设置连接属性从连接工厂获取连接从链接中创建通道声明(创建)队列 如果队列不存在,才会创建 RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错发送消息 public class Producer { public static void main(String[] args) throws Exception { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2、设置连接属性 factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); // factory.setVirtualHost(virtualHost); try ( // 3、从连接工厂获取连接 Connection connection = factory.newConnection("生产者"); // 4、从链接中创建通道 Channel channel = connection.createChannel();) { /** * 5、声明(创建)队列 如果队列不存在,才会创建 RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错 * * queueDeclare参数说明: * * @param queue * 队列名称 * @param durable * 队列是否持久化 * @param exclusive * 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制 * @param autoDelete * 是否自动删除,当最后一个消费者断开连接之后是否自动删除 * @param arguments * 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等 */ channel.queueDeclare("queue1", false, false, false, null); // 消息内容 String message = "Hello World!"; // 6、发送消息 channel.basicPublish("", "queue1", null, message.getBytes()); System.out.println("发送消息:" + message); } } }

在消费者端

创建连接工厂设置连接属性从连接工厂获取连接从链接中创建通道声明(创建)队列 如果队列不存在,才会创建 RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错定义收到消息后的回调开启队列消费  public class Consumer { public static void main(String[] args) throws Exception { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2、设置连接属性 factory.setHost("rabbitmq.study.com"); factory.setUsername("admin"); factory.setPassword("admin"); String queueName = "queue1"; try ( // 3、从连接工厂获取连接 Connection connection = factory.newConnection("消费者"); // 4、从链接中创建通道 Channel channel = connection.createChannel();) { /** * 5、声明(创建)队列 如果队列不存在,才会创建 RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错 * * queueDeclare参数说明: * * @param queue * 队列名称 * @param durable * 队列是否持久化 * @param exclusive * 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问, * 并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制。 一般在队列和交换器绑定时使用 * @param autoDelete * 是否自动删除,当最后一个消费者断开连接之后是否自动删除 * @param arguments * 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等 */ channel.queueDeclare(queueName, false, false, false, null); // 6、定义收到消息后的回调 DeliverCallback callback = new DeliverCallback() { public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("收到消息:" + new String(message.getBody(), "UTF-8")); } }; // 7、开启队列消费 channel.basicConsume(queueName, true, callback, new CancelCallback() { public void handle(String consumerTag) throws IOException { } }); System.out.println("开始接收消息"); System.in.read(); } } }

 在运行过后 web管理台上就可以看到非持久化的消息

数据和activemq还是不一样,默认时持久化的,没有消费者时,这个队列的消息是会一直存在的。

工作队列

消费者有多个的情况 Work Queue 当队列中的消息处理比较耗时时,我们可以开启多个消息者来并行消费处理消息。

RabbitMQ 将以轮询的方式一个一个将消息发给消费者。 有时会出现一些消费者资源紧张,它处理消息慢于其他一些消费者。而RabbitMQ 并不知道各消费者的 情况,它依然是轮询将消息发送给消费者,此时就可能会造成慢消费者上消息积压,消息得不到及时处 理,而快的消费者则出现空闲。为避免这种情况,我们可以将预取量(prefetchCount) 降低( =1 太过保 守,预取量过低会影响吞吐性能)。

只要没有确认,消息是一直存在的。

 这里在代码中使用是和上面的不一样的是 消息者

public static void main(String[] args) throws Exception { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2、设置连接属性 factory.setHost("localhost"); factory.setUsername("admin"); factory.setPassword("admin"); String queueName = "queue1"; try ( // 3、从连接工厂获取连接 Connection connection = factory.newConnection("消费者1"); // 4、从链接中创建通道 Channel channel = connection.createChannel(); // 可以同一连接创建多个通道,也可是不同连接创建通道 来组成多个消费者 // Connection connection2 = factory.newConnection("消费者2"); // Channel channel2 = connection2.createChannel(); Channel channel2 = connection.createChannel();) { // 5、声明(创建)队列 如果队列不存在,才会创建 RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错 channel.queueDeclare(queueName, false, false, false, null); // 在消费者处理一个消息比较耗时时,减少预发来防止消息得不到及时处理 channel.basicQos(1); // accept only one unack-ed message at a time // 6、定义收到消息后的回调 DeliverCallback callback = (consumerTag, message) -> { System.out.println(consumerTag + " 收到消息:" + new String(message.getBody(), "UTF-8")); }; // 7、开启队列消费 channel.basicConsume(queueName, true, callback, consumerTag -> { }); // 第二个消费者 channel2.basicQos(1); // 只预取一个消息 channel2.basicConsume(queueName, true, callback, consumerTag -> { }); System.out.println("开始接收消息"); } }

可以创建几个通道取消息,默认是256的预取消息,这里随便设置得值为1.每个通道取一次

在spring中使用

只需要在application.yml中添加下面得配置

spring: rabbitmq: host: rabbitmq.study.com port: 5672 username: admin password: admin # listener: # simple: # prefetch: 1

使用时添加对应得 RabbitListener  消费者

@RabbitListener(queues = "hello") public void receive(String in) { System.out.println(" [x] Received '" + in + "'"); }

生产者

34. 消息传递 (spring.io)   

这里配置一个我们要操作的Queue的bean,spring-rabbitmq框架在启动时将从容器中获取这些bean,并向rabbitmq服务器创建这些queue、exchange、binding。在RabbitAdmin.initialize()方法中做的这个事。其完成的工作为:channel.queueDeclare("hello",false, false, false, null);我们也可以自己手动通过 AmqpAdmin.declareXXX(xxx)方法来创建我们需要的queue、exchange、binding。 public class HelloWorldProducer { //spring boot 中 amqp的使用说明: //https://docs.spring.io/spring-boot/docs/2.1.6.RELEASE/reference/html/boot-features-messaging.html#boot-features-amqp /* * 【注意】这里配置一个我们要操作的Queue的bean,spring-rabbitmq框架在启动时将从容器中获取这些bean, * 并向rabbitmq服务器创建这些queue、exchange、binding。 * 在RabbitAdmin.initialize()方法中做的这个事。 * 其完成的工作为:channel.queueDeclare("hello",false, false, false, null); * 我们也可以自己手动通过 AmqpAdmin.declareXXX(xxx)方法来创建我们需要的queue、exchange、binding。 * * @Autowired private AmqpAdmin amqpAdmin; * * public void send() { * ... * this.amqpAdmin.declareQueue(new Queue("hello")); * ... * } */ @Bean public Queue hello() { return new Queue("hello"); } // @Autowired // private AmqpAdmin amqpAdmin; //做queue、exchange、binding的管理用的 @Autowired private RabbitTemplate template; @Autowired private Queue queue; @Scheduled(fixedDelay = 1000) //定时多次发送消息 public void send() { String message = "Hello World!"; this.template.convertAndSend(queue.getName(), message); System.out.println(" [x] Sent '" + message + "'"); } public static void main(String[] args) throws Exception { SpringApplication.run(HelloWorldProducer.class, args); System.in.read(); } }

也是为我们简化了写代码

然后在spring中设置 预取数,或者配置中添加都可以

@Bean public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); // factory.setMessageConverter(myMessageConverter()); factory.setPrefetchCount(1); return factory; } publish/subscribe

Exchange 

RabbitMQ 消息传递模型的核心思想是,生产者永远不会将任何消息直接发送到队列。实际上,通常生 产者甚至不知道消息是否会被传递到任何队列。 相反,生产者只能向交换器发送消息。交换器是一件非常简单的事情。一边接收来自生产者的消息,另 一边将消息推送到队列。交换器必须确切地知道如何处理它接收到的消息。消息是应该追加到一个特定 的队列?还是要追加到许多队列中? ?或者应该被丢弃。这些规则由 exchange 类型定义。 在之前得队列中,利用空字符串当作exchange Exchange types 有 : direct, topic, headers and fanout fanout:扇型交换机 它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中 direct:直连交换机 它会把消息路由到那些 BindingKey 和 RoutingKey 完全匹配的队列中 topic:主题交换机 与 direct 类似,但它可以通过通配符进行模糊匹配 headers:头交换机 不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。 headers 类型的交换器性能很差,而且也不实用。 无名交换器 : "" ,在前面的示例中,我们并没有创建交换器,而是用了系统默认的无名交换器。 channel.basicPublish("", "hello", null, message.getBytes()); 创建 exchange channel.exchangeDeclare(String exchange, String type) channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

发送消息

channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));

当指定exchange 不指定routingkey

在消费者方

创建一个临时队列,名字自动生成(唯一),连接断开,自动删除;

将队列绑定到exchange,绑定时指定的routingKey 也称bingdingKey,在fanout交换器中routingKey无用。 这里主要做绑定队列和exchange建立关联关系 

public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); Connection connection2 = factory.newConnection(); Channel channel2 = connection2.createChannel();) { channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 创建一个临时队列,名字自动生成(唯一),连接断开,自动删除 String queueName = channel.queueDeclare().getQueue(); // 将队列绑定到exchange,绑定时指定的routingKey 也称 // bingdingKey,在fanout交换器中routingKey无用。 channel.queueBind(queueName, EXCHANGE_NAME, ""); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(consumerTag + " 收到消息:" + new String(message.getBody(), "UTF-8")); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); // 第二个消费者 String queueName2 = channel2.queueDeclare().getQueue(); channel2.queueBind(queueName2, EXCHANGE_NAME, ""); channel2.basicConsume(queueName2, true, deliverCallback, consumerTag -> { }); System.out.println("开始接收消息"); System.in.read(); } } 在spring中 @Configuration public class PubSubConfiguration { @Bean public FanoutExchange fanout() { return new FanoutExchange("spring-logs"); } @Configuration public static class ReceiverConfig { @Bean public Queue autoDeleteQueue1() { return new AnonymousQueue(); } @Bean public Queue autoDeleteQueue2() { return new AnonymousQueue(); } @Bean public Binding binding1(FanoutExchange fanout, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1).to(fanout); } @Bean public Binding binding2(FanoutExchange fanout, Queue autoDeleteQueue2) { return BindingBuilder.bind(autoDeleteQueue2).to(fanout); } } }

然后其他得是一样的。

routing

在消费者,拆分开不同的消息,利用routingkey发送到不同的队列 对应消费者

 

 在消费者 这里 的区别在于利用 不同routingkey  orange  black green进行区分开 相当于分类

channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "orange"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(consumerTag + " 收到消息:" + new String(message.getBody(), "UTF-8")); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); // 第二个消费者 String queueName2 = channel2.queueDeclare().getQueue(); channel2.queueBind(queueName2, EXCHANGE_NAME, "black"); channel2.queueBind(queueName2, EXCHANGE_NAME, "green"); channel2.basicConsume(queueName2, true, deliverCallback, consumerTag -> { });

在提供者这端只需要添加 

channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));

在spring中这个和之前的exchange很像

@Bean public Binding binding1a(DirectExchange direct, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1).to(direct).with("orange"); } @Bean public Binding binding1b(DirectExchange direct, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1).to(direct).with("black"); }

绑定时,添加 配置中

Topic

 利用topic 模式交换  和之前不一样的是,这里采用模糊匹配的方式 ,将粒度更加扩大,多个维度

Binding 绑定 我们已经创建了一个扇出交换器和一个队列。现在我们需要告诉 exchange 向队列发送消息。 exchange 和队列之间的关系称为绑定。 channel.queueBind(queueName, "logs", ""); channel.queueBind(String queue, String exchange, String routingKey)

 RPC方式

 远程过程调用的方式:客户端 和服务端调用者,远端服务器上的程序,进行返回数据

mq本身就是数据中转站,可以利用mq进行实现。

需要对应生成一个队列

public static void main(String[] argv) { try (RPCClient fibonacciRpc = new RPCClient()) { for (int i = 0; i < 32; i++) { String i_str = Integer.toString(i); System.out.println(" [x] Requesting fib(" + i_str + ")"); String response = fibonacciRpc.call(i_str); System.out.println(" [.] Got '" + response + "'"); } } catch (IOException | TimeoutException | InterruptedException e) { e.printStackTrace(); } } public String call(String message) throws IOException, InterruptedException { final String corrId = UUID.randomUUID().toString(); String replyQueueName = channel.queueDeclare().getQueue(); AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); final BlockingQueue response = new ArrayBlockingQueue(1); String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> { if (delivery.getProperties().getCorrelationId().equals(corrId)) { response.offer(new String(delivery.getBody(), "UTF-8")); } }, consumerTag -> {}); String result = response.take(); channel.basicCancel(ctag); return result; } public void close() throws IOException { connection.close(); }

通过队列调用 ,生成数据,消息发布者。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3