中间件-RabbitMQ
channel/exchang/queue/virhotst
channel
用于在应用程序和RabbitMQ服务器之间建立通信通道。每个连接(connection)可以包含多个通道,而每个通道都代表一个独立的会话,可以用于发送和接收消息。
exchange
消息经由channel发送到exchange,exchange将会根据绑定关系、routerKey、通配符等信息,路由到对应的消息队列中
queue
即实际用于存储消息的消息队列
virhosts
虚拟机,不同虚拟机间的交换机、队列都相互隔离,一般将不同应用划分为不同的虚拟机
direct/topic/fanout交换机
DirectExchange直连交换机
将消息传输到routerKey匹配并且与自身相绑定的队列中
TopicExchange主题交换机
允许使用通配符的方式来匹配消息的路由键和队列的绑定路由键,例:
- 队列1:绑定键为 "stock.usa.#"
- 队列2:绑定键为 "stock.europe.#"
- 队列3:绑定键为 "stock.#"
现在,假设我们要发布一些与股票相关的消息,并根据不同的股票市场和股票类型将消息路由到不同的队列。路由键的格式通常是由单词组成,用点号分隔,如 "stock.usa.apple"。在主题交换机的匹配规则中,通配符 *
表示匹配一个单词,#
表示匹配零个或多个单词。
- 发布消息:"AAPL stock in USA is rising"
- 路由键:"stock.usa.apple"
- 发布消息:"BMW stock in Europe is declining"
- 路由键:"stock.europe.bmw"
- 发布消息:"Google stock is performing well"
- 路由键:"stock.google"
在这个例子中,根据队列的绑定键和消息的路由键,消息将被路由到适当的队列:
- 队列1(绑定键:"stock.usa.#")会接收第一个消息:"AAPL stock in USA is rising",因为它匹配了 "stock.usa.#" 的绑定键。
- 队列2(绑定键:"stock.europe.#")会接收第二个消息:"BMW stock in Europe is declining",因为它匹配了 "stock.europe.#" 的绑定键。
- 队列3(绑定键:"stock.#")会接收所有三个消息,因为它匹配了 "stock.#" 的绑定键,可以匹配所有以 "stock." 开头的路由键。
FanoutExchange扇形交换机
将消息广播发送到绑定的所有队列中,无视routerKey/通配符等
代码实例
使用流程
使用rabbitmq前需要先在配置类中完成对交换机、队列以及他们直接的绑定关系的声明
提问:在rabbitmq中已经完成了队列绑定关系的声明,还需要再次在代码中声明吗?
答:理论上不需要,但通过在配置类代码中声明的方式,可以实现依靠代码即可完成绑定关系,而不需要再次手动操作rabbitmq管理页面创建队列、交换机并对他们进行绑定,换言之修改代码即可完成对绑定关系的修改,有利于后续维护
@Configuration
public class RabbitConfig {
// 声明队列名、交换机名
public static final String INVOICE_EXCHANGE_NAME = "lottery_invoice_exchange";
public static final String INVOICE_QUEUE_NAME = "lottery_invoice_queue";
public static final String ACTIVITYPARTAKE_EXCHANGE_NAME = "lottery_activityPartake_exchange";
public static final String ACTIVITYPARTAKE_QUEUE_NAME = "lottery_activityPartake_queue";
/**
* 声明发货单交换机
*/
@Bean("invoiceExchange")
public FanoutExchange invoiceExchange() {
return new FanoutExchange(INVOICE_EXCHANGE_NAME,true,false);
}
/**
* 声明发货单队列
*/
@Bean("invoiceQueue")
public Queue invoiceQueue() {
return new Queue(INVOICE_QUEUE_NAME,true,false,false);
}
/**
* 声明发货单绑定关系
*/
@Bean("invoiceQueueBinding")
public Binding invoiceQueueBinding(@Qualifier("invoiceQueue") Queue invoiceQueue, @Qualifier("invoiceExchange") FanoutExchange invoiceExchange) {
return BindingBuilder.bind(invoiceQueue).to(invoiceExchange);
}
/**
* 声明活动领取交换机
*/
@Bean("activityPartakeExchange")
public FanoutExchange activityPartakeExchange() {
return new FanoutExchange(ACTIVITYPARTAKE_EXCHANGE_NAME,true,false);
}
/**
* 声明活动领取队列
*/
@Bean("activityPartakeQueue")
public Queue activityPartakeQueue() {
return new Queue(ACTIVITYPARTAKE_QUEUE_NAME,true,false,false);
}
/**
* 声明活动领取绑定关系
*/
@Bean("activityPartakeQueueBinding")
public Binding activityPartakeQueueBinding(@Qualifier("activityPartakeQueue") Queue invoiceQueue, @Qualifier("activityPartakeExchange") FanoutExchange invoiceExchange) {
return BindingBuilder.bind(invoiceQueue).to(invoiceExchange);
}
}
生产者实例
@Component
public class RabbitProducer {
@Resource
private RabbitTemplate rabbitTemplate;
private Logger logger = LoggerFactory.getLogger(RabbitProducer.class);
// 回调函数
public interface CallBackHandler {
/**
* 成功发送
*/
void onSuccess();
/**
* 发送到交换机失败
*/
void onExchangeException();
/**
* 发送到队列失败
*/
void onQueueException();
}
/**
* 发送中奖物品发货单
* @param invoiceVO 发货单
* @param callBackHandler 回调函数
*/
public void sendLotteryInvoice(InvoiceVO invoiceVO,CallBackHandler callBackHandler) {
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
callBackHandler.onSuccess();
} else {
callBackHandler.onExchangeException();
}
});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
callBackHandler.onQueueException();
});
// 消息转换
String message = JSON.toJSONString(invoiceVO);
logger.info("发送MQ消息 exchange:{} bizId:{} message:{}", RabbitConfig.INVOICE_EXCHANGE_NAME, invoiceVO.getUId(), message);
rabbitTemplate.convertAndSend(RabbitConfig.INVOICE_EXCHANGE_NAME, "", message);
}
/**
* 发送领取活动记录MQ
* @param activityPartakeRecord 活动领取记录
*/
public void sendLotteryActivityPartakeRecord(ActivityPartakeRecordVO activityPartakeRecord){
// rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
// if (!ack) {
// callBackHandler.onSuccess();
// } else {
// callBackHandler.onExchangeException();
// }
// });
//
// rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// callBackHandler.onQueueException();
// });
String jsonString = JSON.toJSONString(activityPartakeRecord);
logger.info("发送MQ消息(领取活动记录) topic:{} bizId:{} message:{}", RabbitConfig.ACTIVITYPARTAKE_EXCHANGE_NAME, activityPartakeRecord.getUId(), jsonString);
rabbitTemplate.convertAndSend(RabbitConfig.ACTIVITYPARTAKE_EXCHANGE_NAME, "", jsonString);
}
}
回调函数
为了将mq的发送过程与发送结果的处理解耦,即将发送成功和失败的处理交由调用方去定义,需要使用到回调函数,定义一个内部接口,即上述代码中的CallBackHandler,在调用sendLotteryInvoice时传入CallBackHandler的匿名类实现对象,即可完成解耦操作,如下为调用方代码
rabbitProducer.sendLotteryInvoice(invoiceVO, new RabbitProducer.CallBackHandler() {
@Override
public void onSuccess() {
// 4.1 MQ 消息发送完成,更新数据库表 user_strategy_export.mq_state = 1
activityPartake.updateInvoiceMqState(invoiceVO.getUId(), invoiceVO.getOrderId(), Constants.MQState.COMPLETE.getCode());
}
@Override
public void onExchangeException() {
logger.info("消息发送到交换机失败,invoice:{}",invoiceVO);
// 4.2 MQ 消息发送失败,更新数据库表 user_strategy_export.mq_state = 2 【等待定时任务扫码补偿MQ消息】
activityPartake.updateInvoiceMqState(invoiceVO.getUId(), invoiceVO.getOrderId(), Constants.MQState.FAIL.getCode());
}
@Override
public void onQueueException() {
logger.info("消息发送到队列失败,invoice:{}",invoiceVO);
// 4.2 MQ 消息发送失败,更新数据库表 user_strategy_export.mq_state = 2 【等待定时任务扫码补偿MQ消息】
activityPartake.updateInvoiceMqState(invoiceVO.getUId(), invoiceVO.getOrderId(), Constants.MQState.FAIL.getCode());
}
});
消费者实例
消费者需要在消费处理方法上,使用@RabbitListener(queues = "lottery_activityPartake_queue")注解,queues指定的是需要监听的消息队列
@Component
public class LotteryActivityPartakeRecordListener {
private Logger logger = LoggerFactory.getLogger(LotteryActivityPartakeRecordListener.class);
@Resource
private IActivityPartake activityPartake;
/**
* 监听队列
*/
@RabbitListener(queues = "lottery_activityPartake_queue")
public void process(Message message, Channel channel) {
// 获取消息内容
byte[] messageByte = message.getBody();
String messageBody = new String(messageByte);
// 判断消息是否为空
if (StringUtils.isEmpty(messageBody)) {
return;
}
// 处理MQ消息
try{
// 1. 转化对象
ActivityPartakeRecordVO activityPartakeRecordVO = JSON.parseObject(messageBody, ActivityPartakeRecordVO.class);
logger.info("消费MQ消息,异步扣减活动库存 message:{}", messageBody);
// 2. 更新数据库库存【实际场景业务体量较大,可能也会由于MQ消费引起并发,对数据库产生压力,所以如果并发量较大,可以把库存记录缓存中,并使用定时任务进行处理缓存和数据库库存同步,减少对数据库的操作次数】
activityPartake.updateActivityStock(activityPartakeRecordVO);
} catch (Exception e) {
// 发奖环节失败,消息重试。所有到环节,发货、更新库,都需要保证幂等。
logger.error("消费MQ消息,失败 topic:{} message:{}", "lottery_invoice", messageBody);
System.out.printf(e.getMessage());
}
}
}
评论区