RabbitMQ安装及客户端使用 |
您所在的位置:网站首页 › mongodump命令要装客户端吗 › RabbitMQ安装及客户端使用 |
less /var/log/rabbitmq/[email protected]
RabbitMQ有一套默认的配置,能够满足日常开发需求,如果需要修改,需要自己建立个配置文件 也就是说rabbitmq是不需要我们修改配置的。 一些常使用命令 查看rabbitmq状态 systemctl status rabbitmq![]() 包括一些存放日志的地方 rabbitMQ会绑定一些端口,安装完后,需要将这些端口添加至防火墙。 15674端口:基于websocket的stomp客户端端口 当插件web stomp启用的时候打开 15675端口:基于websocket的mqtt客户端 端口 当web mqtt打开的时候 在bin中能看到下面的命令
文档上面包括所有命令 在rabbitmq中有管理插件,需要手动激活 rabbitmq-plugins enable rabbitmq_management RabbitMQ 有一个默认的用户 “guest” ,但这个用户默认只能通过本机访问,要让其它机器可以访问,需 要创建一个新用户,为其分配权限。 #添加用户 rabbitmqctl add_user admin admin #为用户分配管理权限 rabbitmqctl set_user_tags admin administrator #为用户分配资源权限 https://www.rabbitmq.com/rabbitmqctl.8.html#set_permissions rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" RabbitMQ 的用户角色分类 none 不能访问 management plugin management 用户可以通过 AMQP 做的任何事,外加: 列出自己可以通过AMQP登入的virtual hosts 查看自己的virtual hosts中的queues, exchanges 和 bindings 查看和关闭自己的channels 和 connections 查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动。policymaker management 可以做的任何事外加: 查看、创建和删除自己的virtual hosts所属的policies和parameters monitoring management 可以做的任何事外加: 列出所有virtual hosts,包括他们不能登录的virtual hosts 查看其他用户的connections和channels 查看节点级别的数据如clustering和memory使用情况 查看真正的关于所有virtual hosts的全局的统计信息 administrator policymaker 和 monitoring 可以做的任何事外加: 创建和删除virtual hosts 查看、创建和删除users 查看创建和删除permissions 关闭其他用户的connections 访问web管理控制台http://ip:15672 登录进去包括端口等等,连接 节点名称。 开的通道以及 通道的各种情况 虚拟主机 ![]() RabbitMQ Tutorials — RabbitMQ 提供给例子进行使用和操作 客户端依赖 com.rabbitmq amqp-client 5.7.3使用场景用队列进行交互 利用队列进行交互 这里创建一个 生产者、以及消费者 直接利用ConnectionFactory ,并创建Channel 通道 创建连接工厂设置连接属性从连接工厂获取连接从链接中创建通道声明(创建)队列 如果队列不存在,才会创建 RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错发送消息 public class Producer { public static void main(String[] args) throws Exception { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2、设置连接属性 factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); // factory.setVirtualHost(virtualHost); try ( // 3、从连接工厂获取连接 Connection connection = factory.newConnection("生产者"); // 4、从链接中创建通道 Channel channel = connection.createChannel();) { /** * 5、声明(创建)队列 如果队列不存在,才会创建 RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错 * * queueDeclare参数说明: * * @param queue * 队列名称 * @param durable * 队列是否持久化 * @param exclusive * 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制 * @param autoDelete * 是否自动删除,当最后一个消费者断开连接之后是否自动删除 * @param arguments * 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等 */ channel.queueDeclare("queue1", false, false, false, null); // 消息内容 String message = "Hello World!"; // 6、发送消息 channel.basicPublish("", "queue1", null, message.getBytes()); System.out.println("发送消息:" + message); } } }在消费者端 创建连接工厂设置连接属性从连接工厂获取连接从链接中创建通道声明(创建)队列 如果队列不存在,才会创建 RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错定义收到消息后的回调开启队列消费 public class Consumer { public static void main(String[] args) throws Exception { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2、设置连接属性 factory.setHost("rabbitmq.study.com"); factory.setUsername("admin"); factory.setPassword("admin"); String queueName = "queue1"; try ( // 3、从连接工厂获取连接 Connection connection = factory.newConnection("消费者"); // 4、从链接中创建通道 Channel channel = connection.createChannel();) { /** * 5、声明(创建)队列 如果队列不存在,才会创建 RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错 * * queueDeclare参数说明: * * @param queue * 队列名称 * @param durable * 队列是否持久化 * @param exclusive * 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问, * 并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制。 一般在队列和交换器绑定时使用 * @param autoDelete * 是否自动删除,当最后一个消费者断开连接之后是否自动删除 * @param arguments * 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等 */ channel.queueDeclare(queueName, false, false, false, null); // 6、定义收到消息后的回调 DeliverCallback callback = new DeliverCallback() { public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("收到消息:" + new String(message.getBody(), "UTF-8")); } }; // 7、开启队列消费 channel.basicConsume(queueName, true, callback, new CancelCallback() { public void handle(String consumerTag) throws IOException { } }); System.out.println("开始接收消息"); System.in.read(); } } }在运行过后 web管理台上就可以看到非持久化的消息 数据和activemq还是不一样,默认时持久化的,没有消费者时,这个队列的消息是会一直存在的。 工作队列消费者有多个的情况 Work Queue 当队列中的消息处理比较耗时时,我们可以开启多个消息者来并行消费处理消息。 ![]() 只要没有确认,消息是一直存在的。 这里在代码中使用是和上面的不一样的是 消息者 public static void main(String[] args) throws Exception { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2、设置连接属性 factory.setHost("localhost"); factory.setUsername("admin"); factory.setPassword("admin"); String queueName = "queue1"; try ( // 3、从连接工厂获取连接 Connection connection = factory.newConnection("消费者1"); // 4、从链接中创建通道 Channel channel = connection.createChannel(); // 可以同一连接创建多个通道,也可是不同连接创建通道 来组成多个消费者 // Connection connection2 = factory.newConnection("消费者2"); // Channel channel2 = connection2.createChannel(); Channel channel2 = connection.createChannel();) { // 5、声明(创建)队列 如果队列不存在,才会创建 RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错 channel.queueDeclare(queueName, false, false, false, null); // 在消费者处理一个消息比较耗时时,减少预发来防止消息得不到及时处理 channel.basicQos(1); // accept only one unack-ed message at a time // 6、定义收到消息后的回调 DeliverCallback callback = (consumerTag, message) -> { System.out.println(consumerTag + " 收到消息:" + new String(message.getBody(), "UTF-8")); }; // 7、开启队列消费 channel.basicConsume(queueName, true, callback, consumerTag -> { }); // 第二个消费者 channel2.basicQos(1); // 只预取一个消息 channel2.basicConsume(queueName, true, callback, consumerTag -> { }); System.out.println("开始接收消息"); } }可以创建几个通道取消息,默认是256的预取消息,这里随便设置得值为1.每个通道取一次 在spring中使用只需要在application.yml中添加下面得配置 spring: rabbitmq: host: rabbitmq.study.com port: 5672 username: admin password: admin # listener: # simple: # prefetch: 1使用时添加对应得 RabbitListener 消费者 @RabbitListener(queues = "hello") public void receive(String in) { System.out.println(" [x] Received '" + in + "'"); }生产者 34. 消息传递 (spring.io) 这里配置一个我们要操作的Queue的bean,spring-rabbitmq框架在启动时将从容器中获取这些bean,并向rabbitmq服务器创建这些queue、exchange、binding。在RabbitAdmin.initialize()方法中做的这个事。其完成的工作为:channel.queueDeclare("hello",false, false, false, null);我们也可以自己手动通过 AmqpAdmin.declareXXX(xxx)方法来创建我们需要的queue、exchange、binding。 public class HelloWorldProducer { //spring boot 中 amqp的使用说明: //https://docs.spring.io/spring-boot/docs/2.1.6.RELEASE/reference/html/boot-features-messaging.html#boot-features-amqp /* * 【注意】这里配置一个我们要操作的Queue的bean,spring-rabbitmq框架在启动时将从容器中获取这些bean, * 并向rabbitmq服务器创建这些queue、exchange、binding。 * 在RabbitAdmin.initialize()方法中做的这个事。 * 其完成的工作为:channel.queueDeclare("hello",false, false, false, null); * 我们也可以自己手动通过 AmqpAdmin.declareXXX(xxx)方法来创建我们需要的queue、exchange、binding。 * * @Autowired private AmqpAdmin amqpAdmin; * * public void send() { * ... * this.amqpAdmin.declareQueue(new Queue("hello")); * ... * } */ @Bean public Queue hello() { return new Queue("hello"); } // @Autowired // private AmqpAdmin amqpAdmin; //做queue、exchange、binding的管理用的 @Autowired private RabbitTemplate template; @Autowired private Queue queue; @Scheduled(fixedDelay = 1000) //定时多次发送消息 public void send() { String message = "Hello World!"; this.template.convertAndSend(queue.getName(), message); System.out.println(" [x] Sent '" + message + "'"); } public static void main(String[] args) throws Exception { SpringApplication.run(HelloWorldProducer.class, args); System.in.read(); } }也是为我们简化了写代码 然后在spring中设置 预取数,或者配置中添加都可以 @Bean public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); // factory.setMessageConverter(myMessageConverter()); factory.setPrefetchCount(1); return factory; } publish/subscribeExchange RabbitMQ 消息传递模型的核心思想是,生产者永远不会将任何消息直接发送到队列。实际上,通常生 产者甚至不知道消息是否会被传递到任何队列。 相反,生产者只能向交换器发送消息。交换器是一件非常简单的事情。一边接收来自生产者的消息,另 一边将消息推送到队列。交换器必须确切地知道如何处理它接收到的消息。消息是应该追加到一个特定 的队列?还是要追加到许多队列中? ?或者应该被丢弃。这些规则由 exchange 类型定义。![]() 发送消息 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));当指定exchange 不指定routingkey 在消费者方 创建一个临时队列,名字自动生成(唯一),连接断开,自动删除; 将队列绑定到exchange,绑定时指定的routingKey 也称bingdingKey,在fanout交换器中routingKey无用。 这里主要做绑定队列和exchange建立关联关系 public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); Connection connection2 = factory.newConnection(); Channel channel2 = connection2.createChannel();) { channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 创建一个临时队列,名字自动生成(唯一),连接断开,自动删除 String queueName = channel.queueDeclare().getQueue(); // 将队列绑定到exchange,绑定时指定的routingKey 也称 // bingdingKey,在fanout交换器中routingKey无用。 channel.queueBind(queueName, EXCHANGE_NAME, ""); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(consumerTag + " 收到消息:" + new String(message.getBody(), "UTF-8")); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); // 第二个消费者 String queueName2 = channel2.queueDeclare().getQueue(); channel2.queueBind(queueName2, EXCHANGE_NAME, ""); channel2.basicConsume(queueName2, true, deliverCallback, consumerTag -> { }); System.out.println("开始接收消息"); System.in.read(); } } 在spring中 @Configuration public class PubSubConfiguration { @Bean public FanoutExchange fanout() { return new FanoutExchange("spring-logs"); } @Configuration public static class ReceiverConfig { @Bean public Queue autoDeleteQueue1() { return new AnonymousQueue(); } @Bean public Queue autoDeleteQueue2() { return new AnonymousQueue(); } @Bean public Binding binding1(FanoutExchange fanout, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1).to(fanout); } @Bean public Binding binding2(FanoutExchange fanout, Queue autoDeleteQueue2) { return BindingBuilder.bind(autoDeleteQueue2).to(fanout); } } }然后其他得是一样的。 routing在消费者,拆分开不同的消息,利用routingkey发送到不同的队列 对应消费者 在消费者 这里 的区别在于利用 不同routingkey orange black green进行区分开 相当于分类 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "orange"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(consumerTag + " 收到消息:" + new String(message.getBody(), "UTF-8")); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); // 第二个消费者 String queueName2 = channel2.queueDeclare().getQueue(); channel2.queueBind(queueName2, EXCHANGE_NAME, "black"); channel2.queueBind(queueName2, EXCHANGE_NAME, "green"); channel2.basicConsume(queueName2, true, deliverCallback, consumerTag -> { });在提供者这端只需要添加 channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));在spring中这个和之前的exchange很像 @Bean public Binding binding1a(DirectExchange direct, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1).to(direct).with("orange"); } @Bean public Binding binding1b(DirectExchange direct, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1).to(direct).with("black"); }绑定时,添加 配置中 Topic利用topic 模式交换 和之前不一样的是,这里采用模糊匹配的方式 ,将粒度更加扩大,多个维度 远程过程调用的方式:客户端 和服务端调用者,远端服务器上的程序,进行返回数据 mq本身就是数据中转站,可以利用mq进行实现。 需要对应生成一个队列 public static void main(String[] argv) { try (RPCClient fibonacciRpc = new RPCClient()) { for (int i = 0; i < 32; i++) { String i_str = Integer.toString(i); System.out.println(" [x] Requesting fib(" + i_str + ")"); String response = fibonacciRpc.call(i_str); System.out.println(" [.] Got '" + response + "'"); } } catch (IOException | TimeoutException | InterruptedException e) { e.printStackTrace(); } } public String call(String message) throws IOException, InterruptedException { final String corrId = UUID.randomUUID().toString(); String replyQueueName = channel.queueDeclare().getQueue(); AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); final BlockingQueue response = new ArrayBlockingQueue(1); String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> { if (delivery.getProperties().getCorrelationId().equals(corrId)) { response.offer(new String(delivery.getBody(), "UTF-8")); } }, consumerTag -> {}); String result = response.take(); channel.basicCancel(ctag); return result; } public void close() throws IOException { connection.close(); }通过队列调用 ,生成数据,消息发布者。 |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |