前言
RabbitMQ作用
:举几个例子,1、系统解耦,A系统无需关心B系统是否执行成功,无需等待B系统响应,直接把操作扔给mq就可以干其他事情了。2、系统使用高峰期,每秒产生10000条消息需要存储,一次性存入数据库恐怕不太行,所以先把数据发送到 RabbitMQ ,然后设置延时队列,每秒从队列取出1000条存入数据库,这样可以减少数据库压力。3、购买商品下订单以后,发送到延时队列,如果20分钟后没有付款,则从队列删除订单,也就是自动取消订单,如果支付了,则取出存入数据库,下单成功。
一、安装 RabbitMQ Windows安装
太简单,自己bing一下
Linux安装
rabbitmq需要erlang语言环境 更新 apt 库,安装 erlang 环境,然后执行 erl
查看是否安装成功
1 2 3 apt update apt install erlang erl
安装 rabbitmq
1 apt install rabbitmq-server
查看 rabbitmq 运行状态
1 systemctl status rabbitmq-server
开启图形化管理界面,然后就可以访问 ip:15672,默认账号密码是 guest
1 rabbitmq-plugins enable rabbitmq_management
默认的guest用户是只能通过本机访问的,所以远程管理后台界面登录需要配置个用户,才能通过外网浏览器访问
1 2 3 4 5 6 rabbitmqctl add_user root root rabbitmqctl set_user_tags root administrator rabbitmqctl set_permissions -p / root “.*” “.*” “.*”
开放防火墙 5672 和 15672 端口
1 2 3 4 5 6 7 8 9 10 11 12 ufw allow 5672 ufw allow 15672 ufw reload iptables -A INPUT -p tcp --dport 5672 -j ACCEPT iptables -A INPUT -p tcp --dport 15672 -j ACCEPT iptables-restore firewall-cmd --zone=public --add-port=5672/tcp --permanent firewall-cmd --zone=public --add-port=15672/tcp --permanent firewall-cmd --reload
二、新建项目
新建一个 provider 一个 consumer,两个 springboot 项目,都需要引入下面的依赖,或者新建的时候勾选自动添加 rabbitmq 的依赖
1、引入依赖 1 2 3 4 5 <dependency > <groupId > org.springframework.amqp</groupId > <artifactId > spring-rabbit-test</artifactId > <scope > test</scope > </dependency >
2、配置 yml
provider 和 consumer 都这样配置,端口改成不一样就行了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 server: port: 8081 spring: rabbitmq: host: 192.168 .0 .105 port: 5672 username: root password: root virtualHost: / publisher-confirm-type: correlated publisher-returns: true
3、启动类开启 Rabbitmq 注解
consumer 和 provider 都需要这个注解
4、配置 provider 的 RabbitmqConfig
大家可以根据 15672 那个图形化管理界面看看下面的一些概念
Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息的载体,每个消息都会被投到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
Producer:消息生产者,就是投递消息的程序.
Consumer:消息消费者,就是接受消息的程序.
Channel:消息通道,在客户端的每个连接里,可建立多个channel.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 package icu.xuyijie.provider.config;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.beans.factory.config.ConfigurableBeanFactory;import org.springframework.boot.SpringBootConfiguration;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Scope;import org.springframework.web.filter.CharacterEncodingFilter;import java.util.HashMap;import java.util.Map;@SpringBootConfiguration public class RabbitmqConfig { public static final String QUEUE_MESSAGE = "queue_message" ; public static final String QUEUE_ORDER = "queue_order" ; public static final String EXCHANGE_A = "exchange_A" ; public static final String ROUTING_KEY_MESSAGE = "#.message.#" ; public static final String ROUTING_KEY_ORDER = "#.order.#" ; @Bean public ConnectionFactory connectionFactory () { CachingConnectionFactory connectionFactory = new CachingConnectionFactory ("192.168.0.105" , 5672 ); connectionFactory.setUsername("root" ); connectionFactory.setPassword("root" ); connectionFactory.setVirtualHost("/" ); connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); connectionFactory.setPublisherReturns(true ); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate () { RabbitTemplate template = new RabbitTemplate (this .connectionFactory()); template.setMessageConverter(this .jsonMessageConverter()); template.setMandatory(true ); return template; } @Bean public MessageConverter jsonMessageConverter () { return new Jackson2JsonMessageConverter (); } @Bean public CharacterEncodingFilter characterEncodingFilter () { CharacterEncodingFilter filter = new CharacterEncodingFilter (); filter.setEncoding("UTF-8" ); filter.setForceEncoding(true ); return filter; } @Bean public Exchange exchangeA () { return ExchangeBuilder.topicExchange(EXCHANGE_A).durable(true ).build(); } @Bean public Queue queueMessage () { return new Queue (QUEUE_MESSAGE, true , false , false ); } @Bean public Queue queueOrder () { return new Queue (QUEUE_ORDER); } @Bean public Binding bindingQueueMessage () { return BindingBuilder.bind(queueMessage()).to(exchangeA()).with(ROUTING_KEY_MESSAGE).noargs(); } @Bean public Binding bindingQueueOrder () { return BindingBuilder.bind(queueOrder()).to(exchangeA()).with(ROUTING_KEY_ORDER).noargs(); } }
5、provider 发送消息
我们因为配置了确认机制,所以我们配置了回调方法,这里使用构造器注入 rabbitTemplate
,如果不配置回调方法,则可以使用 @Autowired
注入,并且类无需实现 RabbitTemplate.ConfirmCallback
,sendExchange 方法没有使用回调方法,使用回调方法的话需要像 sendCallback 方法一样多传一个值 correlationId
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 package icu.xuyijie.provider.controller;import icu.xuyijie.provider.config.RabbitmqConfig;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.util.UUID;import java.util.concurrent.ExecutionException;@RestController @RequestMapping("/provider") public class ProviderController implements RabbitTemplate .ConfirmCallback { private final RabbitTemplate rabbitTemplate; public ProviderController (RabbitTemplate rabbitTemplate) { this .rabbitTemplate = rabbitTemplate; this .rabbitTemplate.setConfirmCallback(this ); } @RequestMapping("/sendExchange") public void sendExchange () { String message = "这是一条发送到exchangeA的消息" ; rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_A, "a.message" , message); } @RequestMapping("/sendCallback") public void sendCallback () { String message = "这是一条发送到exchangeA的消息" ; String callBackId = UUID.randomUUID().toString(); CorrelationData correlationId = new CorrelationData (callBackId); rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_A, "a.message" , message, correlationId); System.out.println("发送回调id: " + callBackId); } @Override public void confirm (CorrelationData correlationData, boolean ack, String s) { assert correlationData != null ; System.out.println("这是回调方法打印的:回调id: " + correlationData.getId()); try { System.out.println("这是回调方法打印的:回调message: " + correlationData.getFuture().get()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException (e); } if (ack) { System.out.println("这是回调方法打印的:消息发送成功" ); } else { System.out.println("消息发送失败" + s); } } }
6、consumer 接收消息
@RabbitListener
就是监听的队列,可以监听多个
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package icu.xuyijie.consumer.consumer;import com.rabbitmq.client.Channel;import icu.xuyijie.consumer.config.RabbitmqConfig;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component public class ReceiveHandler { @RabbitListener(queues = {"queue_message", "queue_order"}) public void receiveMessage (Message message, Channel channel) { System.out.println("接收 queue_message 和 queue_order 队列的消息 " + message); System.out.println("对应Channel " + channel); } @RabbitListener(queues = {"queue_order"}) public void receiveOrder (Message message, Channel channel) { System.out.println("接收 queue_order 队列的消息" + message); System.out.println("对应Channel " + channel); } }
7、演示
我们直接调用 sendCallback 这个接口
consumer 接收到消息
provider 触发回调方法
三、延时队列 1、在 provider 的 RabbitmqConfig 中增加配置
增加了一个一个交换机、一个队列、一个路由键的配置,注意 delayQueue() 方法,我的注释有解释
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public static final String QUEUE_DELAY = "queue_delay" ; public static final String EXCHANGE_DELAY = "exchange_delay" ; public static final String ROUTER_DELAY_KEY = "router_delay_key" ; @Bean public DirectExchange delayExchange () { return new DirectExchange (EXCHANGE_DELAY); } @Bean public Queue delayQueue () { Map<String, Object> map = new HashMap <>(16 ); map.put("x-dead-letter-exchange" , EXCHANGE_A); map.put("x-dead-letter-routing-key" , ROUTING_KEY_ORDER); return new Queue (QUEUE_DELAY, true , false , false , map); } @Bean public Binding delayBinding () { return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(ROUTER_DELAY_KEY); }
2、在 provider 增加一个接口
发送消息到我们刚刚配置的延时交换机
1 2 3 4 5 6 7 8 9 10 11 @RequestMapping("/sendDelay") public void sendDelay () { rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_DELAY, RabbitmqConfig.ROUTER_DELAY_KEY, "这是一条延时队列的消息" , message -> { message.getMessageProperties().setExpiration("3000" ); return message; }, new CorrelationData (UUID.randomUUID().toString())); System.out.println("延时队列发送成功" ); }
3、consumer 接收延时消息
注意,上面我们把延时消息发送到了延时队列 EXCHANGE_DELAY,但是我们接收,要监听 RabbitmqConfig
中 delayQueue() 方法 配置的 EXCHANGE_A,路由键为 ROUTING_KEY_ORDER,也就是说无需改动 consumer ,consumer 的两个方法都能收到延时消息,因为他们都监听了 ROUTING_KEY_ORDER 对应的队列
直接调用 sendDelay 方法,2次(因为两个方法都监听 queue_order,所以他们会交替获得消息)
3秒后 consumer 的两个方法都能接收到延时消息
总结