RabbitMQ
MQ基础
1.MQ基本概念
MQ 全程 Message Queue(消息队列),是在消息传输过程中保存消息的容器。多用于分布式系统之间进行通信。
优势:
- 应用解耦
- 异步提速
- 削峰填谷
劣势:
- 系统可用性降低
- 系统复制度提高
- 一致性问题
1.1小结
使用MQ需要满足什么条件:
- 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当动作做完了继续往后走,即所谓异步成为了可能。
- 容许短暂的不一致性。
- 收益超过加入MQ、管理MQ成本。
常见MQ产品:RabbitMQ、RocketMQ、kafka
AMQP,即Advanced Message Queueing Protocol(高级消息队列协议)是应用层协议的开放标准,为面向消息的中间件设计。
AMQP 架构如下:
RabbitMQ 基础架构如下:
1.2RabbitMQ 相关概念
- Broker:接受和分发消息的应用
- Virtual host:处于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似网络中namespace概念。当多个不同的用户使用同一个RabbitMQ server提供服务时,可以划分多个vhost,每个用户在自己的vhost创建exchange / queue等
- Connection:publisher / consumer和broker之间的TCP连接
- Channel:如果每一次访问RabbitMQ都建立一个Connectio,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在Connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQPmethod包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
- Exchange:message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct(point-to-point),topic(publish-subscribe) and fanout(multicast)
- Queue:消息最终被送达这里等待consumer取走
- Binding:exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发权限。
RabbitMQ的6中工作模式:简单模式、work queues、Publish/Subscribe 发布于订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式、Publisher Comfirm。
1.3JMS
即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件的API。
1.4小结
- RabbitMQ 是基于AMQP协议使用Erlang语言开发的一款消息队列产品
- RabbitMQ 提供了6中工作模式
- AMQP 是协议,类比HTTP
- JMS 是API规范接口,类比JDBC
2.RabbitMQ 的安装和配置
2.1RabbitMQ web UI
# 开启管理界面
rabbitmq-plugins enable rabbitmq_managemet
# 启动
service rabbitmq-server start # 启动服务
service rabbitmq-server stop # 停止服务
service rabbitmq-server restart # 重启服务
-
用户管理
-
新增用户
rabbitmqctl add_user 用户名 密码
-
删除用户
rabbitmqctl delete_user 用户名
-
修改密码
rabbitmqctl change_password 用户名 新密码
-
查看当前用户列表
rabbitmqctl list_users
-
-
用户角色
-
角色可分类为五种:administrator、monitoring、policymaker、management、普通用户
-
设置用户角色
rabbitmqctl set_user_tags [administrator]
-
-
用户权限
-
权限有conf、write、read
-
设置用户权限
rabbitmqctl set_permission [-p VHostPath] 用户名 权限
-
查看所有用户权限信息
rabbitmqctl list_permissions [-p VHostPath]
-
清除用户的权限信息
rabbitmqctl clear_permissions [-p VHostPath] 用户名
-
3.RabbitMQ 快速入门
3.1入门程序
需求:使用简单模式完成消息传递。
步骤:
-
创建工程(生产者、消费者)
-
分别添加依赖
<dependencies> <!-- rabbitmq java 客户端 --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
-
编写生产者发送消息
package person.kuntang.producer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author kun tang * @date 2023/2/27 */ public class RabbitmqProducer { public static void main(String[] args) throws IOException, TimeoutException { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2、设置参数 factory.setHost("192.168.13.129"); factory.setPort(5672); factory.setVirtualHost("demo"); factory.setUsername("user"); factory.setPassword("user"); // 3、创建连接 Connection Connection connection = factory.newConnection(); // 4、创建 Channel Channel channel = connection.createChannel(); // 5、创建队列 Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 参数: 1、queue:队列名称 2、durable:是否持久化。当mq重启之后还在 3、exclusive: · 是否独占,只能有一个消费者监听这个队列 · 当Connection关闭,是否删除队列 · 4、autoDelete:当没有consumer时,是否自动删除 5、argument:参数 */ // 如果没有一个名字叫hello_world的队列,则创建队列,如果有则不会创建 channel.queueDeclare("hello_world",true,false,false,null); // 6、发送消息 /* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) 参数: 1、exchange:交换机名称。简单模式下交换机会使用默认的"" 2、routingKey:路由名称 3、props:配置信息 4、body:发送消息数据 */ String body = "hello,rabbitmq..."; channel.basicPublish("","hello_world",null,body.getBytes()); // 7、释放资源 channel.close(); connection.close(); } }
-
编写消费者接受消息
// 6、接收消息 /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1、queue:队列名称 2、autoAck:是否自动确认 3、callback:回调对象 */ Consumer consumer = new DefaultConsumer(channel){ /* 回调方法,当收到消息后,自动执行该方法 1、consumerTag:标识 2、envelope:获取信息,交换机、路由key... 3、properties:配置信息 4、body:数据 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumeTag: " + consumerTag); System.out.println("Exchange: " + envelope.getExchange()); System.out.println("RoutingKey: " + envelope.getRoutingKey()); System.out.println("properties: " + properties); System.out.println("body: " + new String(body)); } }; channel.basicConsume("hello_world",true,consumer); // 7、不要关闭资源
4.RabbitMQ 的工作模式
-
Work queues 工作队列模式
-
模式说明
与普通模式相比,多了一些消费端,多个消费端共同消费同一个队列中的消息。
-
应用场景
对于任务过重或任务较多的情况使用工作队列可以提高任务处理速度。
-
-
Pub/Sub 订阅模式
-
模式说明
多了一个Exchange角色,且过程略有变化:
- P:生产者,不再发送到队列,而是发给X(交换机)
- C:一直等待消息到来
- Queue:接受、缓存消息
- Exchange:一方面,接收生产者发送的消息。另一方面,知道如何处理消息。
- Fanout:广播,将消息交给所有绑定到交换机的队列;
- Direct:定向,把消息交给符合指定routing key的队列;
- Topic:通配符,把消息交给符合routing pattern的队列。
- Headers:参数匹配
*Exchange只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,你消息会丢失。
-
-
Routing 路由模式
- 模式说明:
- 队列与交换机绑定不能是任意绑定,而是指定RoutingKey
- 消息发送方在向Exchange发送消息时,也必须指定消息的RoutingKey
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey一直,才会接收到消息
- 模式说明:
-
Topic 通配符模式
Topic 主题模式可以实现Pub/Sub 模式与Routing 路由模式的功能,而且Topic在配置routing key的时候可以使用通配符,更灵活。
5.Spring 整合 RabbitMQ
SpringBoot整合RabbitMQ
5.1生产者
-
创建生产者SpringBoot工程
-
引入依赖坐标
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-start-amqp</artifactId> </dependency>
-
编写yml配置,基本信息配置
-
定义交换机、队列及绑定关系的配置类
@Configuration public class RabbitMQConfig{ public static final String EXCHANGE_NAME = "boot_topic_exchange"; public static final String QUEUE_NAME = "boot_queue"; // 1、交换机 @Bean("bootExchange") public Exchange bootExchange(){ return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } // 2、Queue队列 @Bean("bootQueue") public Queue bootQueue(){ return QueueBulider.durable(QUEUE_NAME).build(); } // 3、队列和交换机绑定关系Binding /* 1、知道哪个队列 2、知道哪个交换机 3、routing key */ @Bean() public Bind bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs(); } }
-
注入RabbitTemplate,调用方法,完成消息发送
public class ProducerTest{ // 1、注入RabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSend(){ rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.log","boot mq hello"); } }
5.2消费者
-
创建消费者SpringBoot工程
-
引入start,依赖坐标
-
编写yml配置,基本信息配置
-
定义监听类,使用@RabbitListener注解完成队列监听
@Component public class RabbitMQListener{ @RabbitListener(queues="boot_queue") public void ListenerQueue(Message message){ System.out.println(new String(message.getBody())); } }
5.3小结
- SpinrgBoot提供了快速整合RabbitMQ的方式
- 基本信息在yml中配置,队列交换机以及绑定关系在配置类中使用Bean的方式配置
- 生产端直接注入RabbitTemplate完成消息发送
- 消费端直接使用@RabbitListener完成消息接收
MQ高级
1.RabbitMQ高级特性
1.1消息可靠性投递
使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败的场景。RabbiMQ为我们提供了两种方式用来控制消息的投递可靠性模式:confirm 确认模式;return 退回模式。
rabbitmq整个消息投递的路径为:
producer ---> rabbitmq broker ---> exchange ---> queue ---> consumer
- 消息从producer到exchange会返回一个confirmCallback
- 消息从exchange到queue投递失败会返回一个returnCallback
- 通过这两个callback控制消息的可靠性投递
确认模式:
- 开启:ConnectionFactory中开启publisher-confirms=
- 在rabbitTemplate定义ConfirmCallback回调函数
回退模式:
-
开启:publisher-returns=true
-
设置RetrunCallback
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){});
-
设置Exchange处理消息模式:如果消息没有路由到Queue,1、丢弃消息;2、返回消息发送方ReturnCallback
1.2Consumer ACK
消费端收到消息后的确认方式。三种确认方式(rabbit:listener-container标签中定义):
- 自动确认:acknowledge="none"
- 手动确认:acknowledge="manual"
- 根据异常情况确认:ackowledge="auto"
自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应的message从rabbitmq的消息缓存中移除。实际业务处理中,可能出现消息收到但业务处理出现异常,那么消息就会丢失。设置手动确认,则需要在业务处理后,调用channel.basicAck(deliveryTag,false),手动签收,如果出现异常,则调用channel.basicNack()或basicReject方法,让其自动重新发送消息。
可靠性总结
- 持久化
- exchange 持久化
- queue 持久化
- message 持久化
- 生产方确认Confirm
- 消费方确认Ack
- Broker高可用
1.3消费端限流
Consumer 限流机制
-
确保ack机制为手动确认acknowledge="manual"
-
listener-container配置属性
perfetch = 1,表示消费端每次从mq拉去一条消息来消费,知道手动确认消费完毕,才会继续拉取下一条消息。
1.4TTL
- 全程Time To Live(存活时间/过期时间)
- 当消息到达存活时间后,还没有被消费,会被自动清除
- RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间
小结
- 设置队列过期时间使用参数:x-message-ttl,单位:ms,火堆整个队列消息统一过期
- 设置消息过期时间使用参数:expiration。单位:ms,当该消息在队列头部时(消费时),会单独判断这一消息是否过期。
- 如果两者都进行了设置,以时间短的为准。
1.5死信队列
Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个队列,这个交换机就算DLX。
消息成为死信的三种情况:
- 队列消息长度到达限度
- 消费者拒绝消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,request=false
- 原队列存在消息过期设置,消息到达超时时间未被消费
队列绑定死信交换机:给队列设置参数:x-dead-letter-exchange和x-dead-letter-routing-key
1.6延迟队列
即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
需求:1、下单后,30分钟未支付,取消订单,回滚库存。2、新用户注册成功7天后,发送短信问候。
实现方式:1、定时器。2、延迟队列。
RabbitMQ中未提供延迟队列功能,但可以使用:TTL+死信队列组合实现延迟队列的效果。
1.7日志与监控
RabbitMQ日志
默认存放路径:/var/log/rabbitmq/rabbit@xxx.log
web管控平台监控
rabbitmqctl管理和监控
1.8消息可靠性分析与追踪
在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。这个时候需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。在RabbitMQ中使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。
注意:打开trace会影响消息写入功能,适当打开后请关闭。
rabbitctl trace_on:开启Firehose命令
rabbitctl trace_off:关闭Firehose命令
2.RabbitMQ应用问题
2.1消息可靠性保障
-
消息补偿机制
2.2消息幂等性处理
幂等性指一次和多次请求某一个资源,对于资源本身应该具有统一的结果。也就是说,任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
- 乐观锁解决方案
3.RabbitMQ集群搭建
Erlang语言天生具备分布式特性(同步Erlang集群各节点的magic cookie实现),因此RabbitMQ天然支持Clustring,不需要像kafka那样通过ZooKeeper实现HA方案和保存集群的元数据。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。