中间件-RabbitMQ

channel/exchang/queue/virhotst

channel

用于在应用程序和RabbitMQ服务器之间建立通信通道。每个连接(connection)可以包含多个通道,而每个通道都代表一个独立的会话,可以用于发送和接收消息。

exchange

消息经由channel发送到exchange,exchange将会根据绑定关系、routerKey、通配符等信息,路由到对应的消息队列中

queue

即实际用于存储消息的消息队列

virhosts

虚拟机,不同虚拟机间的交换机、队列都相互隔离,一般将不同应用划分为不同的虚拟机

direct/topic/fanout交换机

DirectExchange直连交换机

将消息传输到routerKey匹配并且与自身相绑定的队列中

TopicExchange主题交换机

允许使用通配符的方式来匹配消息的路由键和队列的绑定路由键,例:

  • 队列1:绑定键为 "stock.usa.#"
  • 队列2:绑定键为 "stock.europe.#"
  • 队列3:绑定键为 "stock.#"

现在,假设我们要发布一些与股票相关的消息,并根据不同的股票市场和股票类型将消息路由到不同的队列。路由键的格式通常是由单词组成,用点号分隔,如 "stock.usa.apple"。在主题交换机的匹配规则中,通配符 * 表示匹配一个单词,# 表示匹配零个或多个单词。

  1. 发布消息:"AAPL stock in USA is rising"
    • 路由键:"stock.usa.apple"
  2. 发布消息:"BMW stock in Europe is declining"
    • 路由键:"stock.europe.bmw"
  3. 发布消息:"Google stock is performing well"
    • 路由键:"stock.google"

在这个例子中,根据队列的绑定键和消息的路由键,消息将被路由到适当的队列:

  • 队列1(绑定键:"stock.usa.#")会接收第一个消息:"AAPL stock in USA is rising",因为它匹配了 "stock.usa.#" 的绑定键。
  • 队列2(绑定键:"stock.europe.#")会接收第二个消息:"BMW stock in Europe is declining",因为它匹配了 "stock.europe.#" 的绑定键。
  • 队列3(绑定键:"stock.#")会接收所有三个消息,因为它匹配了 "stock.#" 的绑定键,可以匹配所有以 "stock." 开头的路由键。

FanoutExchange扇形交换机

将消息广播发送到绑定的所有队列中,无视routerKey/通配符等

代码实例

使用流程

使用rabbitmq前需要先在配置类中完成对交换机、队列以及他们直接的绑定关系的声明

提问:在rabbitmq中已经完成了队列绑定关系的声明,还需要再次在代码中声明吗?

答:理论上不需要,但通过在配置类代码中声明的方式,可以实现依靠代码即可完成绑定关系,而不需要再次手动操作rabbitmq管理页面创建队列、交换机并对他们进行绑定,换言之修改代码即可完成对绑定关系的修改,有利于后续维护

@Configuration
public class RabbitConfig {

    // 声明队列名、交换机名
    public static final String INVOICE_EXCHANGE_NAME = "lottery_invoice_exchange";
    public static final String INVOICE_QUEUE_NAME = "lottery_invoice_queue";

    public static final String ACTIVITYPARTAKE_EXCHANGE_NAME = "lottery_activityPartake_exchange";

    public static final String ACTIVITYPARTAKE_QUEUE_NAME = "lottery_activityPartake_queue";

    /**
     * 声明发货单交换机
     */
    @Bean("invoiceExchange")
    public FanoutExchange invoiceExchange() {
        return new FanoutExchange(INVOICE_EXCHANGE_NAME,true,false);
    }

    /**
     * 声明发货单队列
     */
    @Bean("invoiceQueue")
    public Queue invoiceQueue() {
        return new Queue(INVOICE_QUEUE_NAME,true,false,false);
    }

    /**
     * 声明发货单绑定关系
     */
    @Bean("invoiceQueueBinding")
    public Binding invoiceQueueBinding(@Qualifier("invoiceQueue") Queue invoiceQueue, @Qualifier("invoiceExchange") FanoutExchange invoiceExchange) {
        return BindingBuilder.bind(invoiceQueue).to(invoiceExchange);
    }


    /**
     * 声明活动领取交换机
     */
    @Bean("activityPartakeExchange")
    public FanoutExchange activityPartakeExchange() {
        return new FanoutExchange(ACTIVITYPARTAKE_EXCHANGE_NAME,true,false);
    }

    /**
     * 声明活动领取队列
     */
    @Bean("activityPartakeQueue")
    public Queue activityPartakeQueue() {
        return new Queue(ACTIVITYPARTAKE_QUEUE_NAME,true,false,false);
    }

    /**
     * 声明活动领取绑定关系
     */
    @Bean("activityPartakeQueueBinding")
    public Binding activityPartakeQueueBinding(@Qualifier("activityPartakeQueue") Queue invoiceQueue, @Qualifier("activityPartakeExchange") FanoutExchange invoiceExchange) {
        return BindingBuilder.bind(invoiceQueue).to(invoiceExchange);
    }

}

生产者实例

@Component
public class RabbitProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    private Logger logger = LoggerFactory.getLogger(RabbitProducer.class);

    // 回调函数
    public interface CallBackHandler {
        /**
         * 成功发送
         */
        void onSuccess();

        /**
         * 发送到交换机失败
         */
        void onExchangeException();

        /**
         * 发送到队列失败
         */
        void onQueueException();
    }

    /**
     * 发送中奖物品发货单
     * @param invoiceVO 发货单
     * @param callBackHandler 回调函数
     */
    public void sendLotteryInvoice(InvoiceVO invoiceVO,CallBackHandler callBackHandler)  {

        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (!ack) {
                callBackHandler.onSuccess();
            } else {
                callBackHandler.onExchangeException();
            }
        });

        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            callBackHandler.onQueueException();
        });

        // 消息转换
        String message = JSON.toJSONString(invoiceVO);
        logger.info("发送MQ消息 exchange:{} bizId:{} message:{}", RabbitConfig.INVOICE_EXCHANGE_NAME, invoiceVO.getUId(), message);

        rabbitTemplate.convertAndSend(RabbitConfig.INVOICE_EXCHANGE_NAME, "", message);


    }

    /**
     * 发送领取活动记录MQ
     * @param activityPartakeRecord 活动领取记录
     */
    public void sendLotteryActivityPartakeRecord(ActivityPartakeRecordVO activityPartakeRecord){
//        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
//            if (!ack) {
//                callBackHandler.onSuccess();
//            } else {
//                callBackHandler.onExchangeException();
//            }
//        });
//
//        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
//            callBackHandler.onQueueException();
//        });

        String jsonString = JSON.toJSONString(activityPartakeRecord);
        logger.info("发送MQ消息(领取活动记录) topic:{} bizId:{} message:{}", RabbitConfig.ACTIVITYPARTAKE_EXCHANGE_NAME, activityPartakeRecord.getUId(), jsonString);
        rabbitTemplate.convertAndSend(RabbitConfig.ACTIVITYPARTAKE_EXCHANGE_NAME, "", jsonString);
    }

}

回调函数

为了将mq的发送过程与发送结果的处理解耦,即将发送成功和失败的处理交由调用方去定义,需要使用到回调函数,定义一个内部接口,即上述代码中的CallBackHandler,在调用sendLotteryInvoice时传入CallBackHandler的匿名类实现对象,即可完成解耦操作,如下为调用方代码

rabbitProducer.sendLotteryInvoice(invoiceVO, new RabbitProducer.CallBackHandler() {
            @Override
            public void onSuccess() {
                // 4.1 MQ 消息发送完成,更新数据库表 user_strategy_export.mq_state = 1
                activityPartake.updateInvoiceMqState(invoiceVO.getUId(), invoiceVO.getOrderId(), Constants.MQState.COMPLETE.getCode());
            }

            @Override
            public void onExchangeException() {
                logger.info("消息发送到交换机失败,invoice:{}",invoiceVO);
                // 4.2 MQ 消息发送失败,更新数据库表 user_strategy_export.mq_state = 2 【等待定时任务扫码补偿MQ消息】
                activityPartake.updateInvoiceMqState(invoiceVO.getUId(), invoiceVO.getOrderId(), Constants.MQState.FAIL.getCode());
            }

            @Override
            public void onQueueException() {
                logger.info("消息发送到队列失败,invoice:{}",invoiceVO);
                // 4.2 MQ 消息发送失败,更新数据库表 user_strategy_export.mq_state = 2 【等待定时任务扫码补偿MQ消息】
                activityPartake.updateInvoiceMqState(invoiceVO.getUId(), invoiceVO.getOrderId(), Constants.MQState.FAIL.getCode());
            }


        });

消费者实例

消费者需要在消费处理方法上,使用@RabbitListener(queues = "lottery_activityPartake_queue")注解,queues指定的是需要监听的消息队列

@Component
public class LotteryActivityPartakeRecordListener {

    private Logger logger = LoggerFactory.getLogger(LotteryActivityPartakeRecordListener.class);

    @Resource
    private IActivityPartake activityPartake;

    /**
     * 监听队列
     */
    @RabbitListener(queues = "lottery_activityPartake_queue")
    public void process(Message message, Channel channel) {
        // 获取消息内容
        byte[] messageByte = message.getBody();
        String messageBody = new String(messageByte);

        // 判断消息是否为空
        if (StringUtils.isEmpty(messageBody)) {
            return;
        }

        // 处理MQ消息
        try{
            // 1. 转化对象
            ActivityPartakeRecordVO activityPartakeRecordVO = JSON.parseObject(messageBody, ActivityPartakeRecordVO.class);
            logger.info("消费MQ消息,异步扣减活动库存 message:{}", messageBody);

            // 2. 更新数据库库存【实际场景业务体量较大,可能也会由于MQ消费引起并发,对数据库产生压力,所以如果并发量较大,可以把库存记录缓存中,并使用定时任务进行处理缓存和数据库库存同步,减少对数据库的操作次数】
            activityPartake.updateActivityStock(activityPartakeRecordVO);

        } catch (Exception e) {
            // 发奖环节失败,消息重试。所有到环节,发货、更新库,都需要保证幂等。
            logger.error("消费MQ消息,失败 topic:{} message:{}", "lottery_invoice", messageBody);
            System.out.printf(e.getMessage());
        }
    }
}