RabbitMQ
RabbitMQ
####基本概念以及AMQP协议
RabbitMQ 是一个开源的消息代理和队列服务器,用来通过普通协议实现跨平台,跨语言共享数据,使用Erlang语言开发,基于AMQP协议。
**Erlang 有着和原生Socket一样的延迟**
AMQP 是具有特征的二进制协议,一个提供统一消息的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计
* server(broker) 接受客户端连接,实现amqp实体服务
* connection,连接,应用程序与broker的网络连接
* channel 网络信道 几乎所有操作都在channel中进行,是进行消息读写的通道,客户端可建立多个channel,每个channel代表一个会话任务。
* message 服务器和应用程序之间传送的数据,properties和body组成,properties对消息进行修饰,比如消息的优先级,延迟等高级特性,body是消息体的内容。
* virtual host (虚拟地址) 用于进行逻辑隔离,是最上层的消息路由,一个virtual host 里面可以有若干个exchange和queue,同一个virtual host里面不能有相同名称的exchange或queue
* exchange (交换机) 接收消息,根据路由键转发消息到绑定的队列
* binding exchange和queue之间的虚拟连接 binding中包含routing key
* routing key 一个路由规则 虚拟机可用它来确定如何路由一个特定消息
* queue (message queue 消息队列)保存消息并将它转发给消费者
####Exchange
(交换机) 接收消息,根据路由键转发消息到绑定的队列
* name :交换机名称
* type :交换机类型 direct,topic,fanout,headers
* durability :是否持久化,ture
* Auto Delete :当最后一个绑定的exchange上的队列删除后,是否自动删除exchange
* internal :当前exchange是否用于mq内部使用,默认为false
* Arguments :扩展参数,用于扩展amqp协议自定制使用
* Direct Exchange
* 所有发送到direct exchange的消息被转发到route key中指定的queue ps : 默认routeingKey和queueName 相同
* Topic Exchange
* 所有发送到**topic exchange**的消息被转发到所有关心route key中指定**topic**的queue
* exchange将ruoteKey和某个topic进行模糊匹配 此时队列需要绑定一个topic
* 可以使用通配符进行匹配
* # 匹配一个或者多个词
* * 匹配不多不少一个词
* Fanout Exchange
* 不处理任何的路由键,只需要简单的将队列绑定到交换机上
* 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
* fanout exchange换发消息是最快的
##### Binding绑定
* exchange和exchange,queue之间的连接关系
* Binding 中可以包含RoutingKey或者参数
##### Queue 消息队列
* 消息队列 实际存贮消息数据
* Durability :是否持久化 Durable : 是 Transient :否
* Aoto delete : 如选yes 代表当最后一个监听被移除之后该queue会被自动删除
##### Message 消息
* 服务器和应用程序之间传送的数据
* 本质上就是一段数据,由properties和payload(body)组成
* 常用属性 : delivery mode, headers(自定义属性)
* content_type,content_encoding,priority
* correlation_id,reply_to,expiration,message_id
##### Virtual host 虚拟主机
* 虚拟地址用于进行逻辑隔离,是最上层的消息路由,
* 一个virtual host 里面可以有若干个exchange和queue,
* 同一个virtual host里面不能有相同名称的exchange或queue
#### 高级特性
* 如何保障消息的100%投递成功
* 保障消息成功发出
* 保障mq节点成功接收
* 发送端收到mq节点(Broker) 确认应答
* 完善消息补偿机制
* 可靠性投递解决方案
* 消息信心落库,对消息状态进行打标 (图1)
* 消息的延迟投递,做二次确认,回调检查 (图2)
图一
生产端-可靠性投递
图二
生产端-可靠性投递
*幂等性
并发场景下多次操作结果一致。
* 幂等性的解决方案
* 唯一id+指纹码机制,利用数据库主键去重
* 利用redis的原子性去实现
*Confirm 确认消息
* 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给生产者一个应答。
* 生产者进行接收应答,用来确认消息是否正常发送到Broker,做可靠性投递保障。
* 实现Confirm确认消息
* 在channel上开启确认模式 **channel.confrimSelect()**
* 在channel上添加监听,**addConfrimListener** 监听处理结果
* Return Listener 机制
处理生产者投递消息不可达机制
* Mandatory默认为false 如果是默认值 broker自动删除该消息
* 自定义消费端监听
extends DefaultConsumer
* 消费端限流
* broker堆积大量未处理数据,消费端消费能力有限。
* rabbitmq 提供一种qos(服务质量保证),在非确认消息的前提下,如果一定数目的消息(基于consumer或者channel设置qos值)未被确认前,不进行消费新的消息。
* void BasicQos(prefetchSize,prefetchCount,global)
* prefetchSize :消息限制大小 0 表示不限制
* prefetchCount :一次最多推送的消息个数
* 在no_ack=false 生效
* global :限流策略级别(T : channel F :consumer)
* Ack与重回队列
* 手动ack和Nack
* 消费端进行消费的时候,如果由于业务异常我们可以进行体制记录,然后进行补偿。
* 宕机等问题,进行手动Ack保障
---------------------------------------------------------------
* 消费端重回队列是为了对处理异常消息,重新投递
* 一般应用建议关闭重回队列
* TTL
* time to live 消息过期时间
* 消息过期
* 队列过期
* 死信队列(DLX)
DLX是一个正常的exchange,可以在任何队列上被指定,实际就是设置某个队列的属性,当队列中有死信时,mq会自动将这个消息重新发布到exchange上去,进而被路由到另一个队。
* dead-letter-exchange
* 消息被拒绝(basic.reject/basic.nack)并且requeue=false
* 消息TTL过期
* 队列达到最大长度
* 队列上添加参数 : arguments.put(x-dead-letter-exchange,dlx.exchange);
##### 整合
* spring
* RabbitaAdmin
* RabbitTemplate
* SimpleMessageLIstenerContainer(动态修改配置)
* 设置事务特性,事务管理器,事务属性,事务容量,是否开启事务,回滚消息等
* 设置消费者数量,最小最大数量,批量消费
* MessageListenerAdapter (消息监听适配器)
* DefaultListenerMethod : 设置监听方法名称
* Delegate 委托对象:实际真实的委托对象,用于处理消息
* QueueOrTagToMethodName 队列标识与方法名称组成匹配
* MessageConverter 消息转换器
默认二进制数据格式传输,可以自定义数据格式。 (可转格式 json,jpg,pdf,ppt,media 等)
*toMessage()
*fromMessage()
* springboot
* RabbitTemplate.ConfirmCallBack
实现一个监听器用于监听broker端给我们返回的确认消息
* RabbitTemplate.ReturnCallBack
保证消息对broker端是可达的 如果出现路由键不可达的情况,则使用监听器对不可达消息进行后续处理,保证消息路由成功
* spring.rabbitmq.listener.simple.acknowledge-mode=manual手动ack
* 手动确认模式可以保证消息的可靠性投递,或者消费端失败的时候可以做重回队列,根据业务记录日志等处理
* 可设置监听个数和最大个数,用于控制消费端并发情况
* RabbitListener
```java
@RabbitListener (
bindings = @QueueBinding(value = @Queue (value = test.springboot.queue,durable = true),
exchange =@Exchange(value = test.springboot.exchange,durable = true,type = topic,ignoreDeclarationExceptions = true),
key = test.springboot.exchange.#
)
)
@RabbitHandler
public voidreceiver(Message message, Channel channel){
log.info(消费端收到 :+message.getPayload().toString());
log.info(消费端收到 :+message.getHeaders().get(AmqpHeaders.DELIVERY_TAG));
//手工Ack
try {
channel.basicAck((Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG),false);
} catch (IOException e) {
e.printStackTrace();
}
}
```
* springcloud
springboot cloud stream 整合
##### 集群架构
* 主备模式
HaPorxy
* 远程模式
Shovel模式 跨地域多台mq集群互联
* 镜像模式
* 多活模式多中心
Federation插件Federation Exchange 可以看成是Downstream 从Upstream 主动拉取消息,但不是拉取所有消息,必须在Downstram上已经明确定义的Bingdings关系的Exchange
也就是有实际的物理Queue来接收消息,才会从Upstream拉取消息到Downstream,使用AMQP协议实施代理间通信,Downstream会将绑定关系组合在一起,绑定,解除命令将发送到Upstream交换机
因此 Federation Exchange 只接受具有订阅的消息
(RabbitMQ)宝,都看到这里了你确定不收藏一下??