本文給大家介紹一下在 Spring Boot 項目中如何集成消息隊列 RabbitMQ,包含對 RibbitMQ 的架構(gòu)介紹、應(yīng)用場景、坑點解析以及代碼實戰(zhàn)。最后文末有免費領(lǐng)取龍年紅包封面以及騰訊云社區(qū)答題領(lǐng)獎福利,歡迎大家領(lǐng)取。
我將使用 waynboot-mall 項目作為代碼講解,項目地址:https://github.com/wayn111/waynboot-mall。本文大綱如下,
圖片
圖片
RibbitMQ 是一個基于 AMQP 協(xié)議的開源消息隊列系統(tǒng),具有高性能、高可用、高擴展等特點。通常作為在系統(tǒng)間傳遞消息的中間件,它可以實現(xiàn)異步處理、應(yīng)用解耦、流量削峰等功能。
圖片
RibbitMQ 的主要組件介紹如下,
圖片
RabbitMQ 是一個非常強大和靈活的消息中間件,它可以應(yīng)用于多種場景和需求。以下是一些常見的 RabbitMQ 應(yīng)用場景和實戰(zhàn)經(jīng)驗:
圖片
在使用 RabbitMQ 的過程中,有一些常見的問題需要注意:
在 waynboot-mall 項目中,消息層包含兩個模塊 waynboot-message-core 以及 waynboot-message-consumer,目錄結(jié)構(gòu)如下,
|-- waynboot-message-core // 核心消息配置,供其他服務(wù)集成使用| |-- config| |-- constant| |-- dto|-- waynboot-message-consumer // 消息消費服務(wù),訂閱隊列接收消息,調(diào)用其他服務(wù)執(zhí)行一些具體的業(yè)務(wù)邏輯| |-- api| |-- config| |-- consumerwaynboot-message-core 包目錄說明如下,
waynboot-message-consumer 包目錄說明如下,
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>${spring-boot.version}</version></dependency>
圖片
在 waynboot-mall 項目中,通過 yml 文件的 spring.rabbitmq.virtual-host=“/” 屬性來指定虛擬主機名稱。
建議大家在使用 RabbitMQ 時都配置好自己項目的虛擬主機名稱,來達到各系統(tǒng)資源隔離的目的。當然如果 RabbitMQ 服務(wù)只有一個項目在用,那就用默認的 / 作為虛擬主機名稱也是可以的。
小知識:出于多租戶和安全因素設(shè)計的,vhost 把 AMQP 的基本組件劃分到一個虛擬的分組中。每個 vhost 本質(zhì)上就是一個 mini 版的 RabbitMQ 服務(wù)器,擁有自己的隊列、交換機、綁定和權(quán)限機制。當多個不同的用戶使用同一個 RabbitMQ 服務(wù)器時,可以劃分出多個虛擬主機。RabbitMQ 默認的虛擬主機路徑是 /。
在 waynboot-mall 項目中,用訂單消息來舉例,生產(chǎn)者發(fā)送消息需要經(jīng)過三個步驟
public class MQConstants { public static final String ORDER_DIRECT_QUEUE = "order_direct_queue"; public static final String ORDER_DIRECT_EXCHANGE = "order_direct_exchange"; public static final String ORDER_DIRECT_ROUTING = "order_direct_routing";}@Configurationpublic class BusinessRabbitConfig { @Bean public Queue orderDirectQueue() { return new Queue(MQConstants.ORDER_DIRECT_QUEUE); } @Bean DirectExchange orderDirectExchange() { return new DirectExchange(MQConstants.ORDER_DIRECT_EXCHANGE); } @Bean Binding bindingOrderDirect() { return BindingBuilder.bind(orderDirectQueue()).to(orderDirectExchange()).with(MQConstants.ORDER_DIRECT_ROUTING); }}在 BusinessRabbitConfig 中,我們創(chuàng)建了訂單交換機、隊列以及路由綁定關(guān)系。在 Spring 項目中,項目啟動時,就會自動在 RabbitMQ 服務(wù)器上創(chuàng)建好這些東西。
交換機列表
隊列列表

生產(chǎn)者的消息發(fā)送確認主要包含兩部分,
producter -> rabbitmq broker exchange -> queue
waynboot-mall 項目的 yml 中關(guān)于 RabbitMQ 的相關(guān)配置如下,
spring: # 配置rabbitMq 服務(wù)器 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest # 消息確認配置項 # 確認消息已發(fā)送到交換機(Exchange) publisher-confirm-type: correlated # 確認消息已發(fā)送到隊列(Queue) publisher-returns: true # 虛擬主機名稱 virtual-host: /可以看到,我們設(shè)置了 publisher-confirm-type 屬性為 correlated,表示開啟發(fā)布確認模式,用來確認消息已發(fā)送到交換機,publisher-confirm-type 有三個選項:
在 RabbitMQ 中,消息發(fā)送到交換機中也不代表消費者一定能接收到消息,所以我們還需要設(shè)置 publisher-returns 為 true 來表示確認交換機中消息已經(jīng)發(fā)送到隊列里。true 表示開啟失敗回調(diào),開啟后當消息無法路由到指定隊列時會觸發(fā) ReturnCallback 回調(diào)。
接著是 RabbitTemplateConfig 的代碼,這里面會定義前面提到的 confirmCallBack、returnCallBack 相關(guān)代碼,
@Slf4j@Componentpublic class RabbitTemplateConfig { @Bean public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 設(shè)置開啟Mandatory,才能觸發(fā)回調(diào)函數(shù),無論消息推送結(jié)果怎么樣都強制調(diào)用回調(diào)函數(shù) rabbitTemplate.setMandatory(true); // 交換機收到消息回調(diào) rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息發(fā)送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause)); // 隊列收到消息回調(diào),如果失敗的話會進行 returnCallback 的回調(diào)處理,反之成功就不會回調(diào)。 rabbitTemplate.setReturnsCallback(returned -> { log.info("returnCallback: " + "消息:" + returned.getMessage()); log.info("returnCallback: " + "回應(yīng)碼:" + returned.getReplyCode()); log.info("returnCallback: " + "回應(yīng)信息:" + returned.getReplyText()); log.info("returnCallback: " + "交換機:" + returned.getExchange()); log.info("returnCallback: " + "路由鍵:" + returned.getRoutingKey()); }); return rabbitTemplate; }}在 RabbitTemplateConfig 類代碼里,我們可以設(shè)置 confirmCallBack、returnCallBack 回調(diào)函數(shù)后,監(jiān)控生產(chǎn)者發(fā)送消息是否被交換機接收、以及交換機是否把消息發(fā)送到隊列中。
在 Spring Boot 項目中,集成了 spring-boot-starter-amqp 依賴后,就可以直接注入 RabbitTemplate 來發(fā)送消息。
這里用 waynboot-mall 項目中的異步下單流程舉例,代碼如下,
@Slf4j@Service@AllArgsConstructorpublic class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService { private RabbitTemplate rabbitTemplate; @Override public R asyncSubmit(OrderVO orderVO) { OrderDTO orderDTO = new OrderDTO(); ... // 開始異步下單 String uid = IdUtil.getUid(); // 1. 創(chuàng)建消息ID,確認機制發(fā)送消息時,需要給每個消息設(shè)置一個全局唯一 id,以區(qū)分不同消息,避免 ack 沖突 CorrelationData correlationData = new CorrelationData(uid); // 2. 創(chuàng)建消息載體 Message ,AMQP 規(guī)范中定義的消息承載類,用來在生產(chǎn)者和消費者之前傳遞消息 Map<String, Object> map = new HashMap<>(); map.put("order", orderDTO); map.put("notifyUrl", WaynConfig.getMobileUrl() + "/callback/order/submit"); try { Message message = MessageBuilder .withBody(JSON.toJSONString(map).getBytes(Constants.UTF_ENCODING)) .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .build(); // 3. 發(fā)送消息到 RabbitMQ 服務(wù)器,需要指定交換機、路由鍵、消息載體以及消息ID rabbitTemplate.convertAndSend(MQConstants.ORDER_DIRECT_EXCHANGE, MQConstants.ORDER_DIRECT_ROUTING, message, correlationData); } catch (UnsupportedEncodingException e) { log.error(e.getMessage(), e); } return R.success().add("actualPrice", actualPrice).add("orderSn", orderSn); }}waynboot-mall 項目中在使用 rabbitTemplate 發(fā)送消息時,按照如下步驟,大家可以參考
以上就是生產(chǎn)者發(fā)送消息時所有相關(guān)代碼了,接著我們看下消費者處理消息的相關(guān)代碼。
在 waynboot-mall 項目中,還是用訂單消息來舉例,消費者 yml 配置如下,
在 RabbitMQ 的消息消費環(huán)節(jié),需要注意的一點就是,如果需要確保消費者不出現(xiàn)漏消費,則需要開啟消費者的手動 ack 模式。
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest ... listener: simple: # 消息確認方式,其有三種配置方式,分別是none、manual(手動ack) 和auto(自動ack) 默認auto acknowledge-mode: manual # 一個消費者最多可處理的nack(未確認)消息數(shù)量,默認是250 prefetch: 250 # 設(shè)置消費者數(shù)量 concurrency: 1在 yml 文件的消費者配置中,acknowledge-mode 屬性用于指定消息確認模式,有三種模式:
如果消費者在消費的過程中沒有拋出異常,則自動確認。
當消費者消費的過程中拋出 AmqpRejectAndDontRequeueException 異常的時候,則消息會被拒絕,且該消息不會重回隊列。
當拋出 ImmediateAcknowledgeAmqpException 異常,消息會被確認。
如果拋出其他的異常,則消息會被拒絕,但是與前兩個不同的是,該消息會重回隊列,如果此時只有一個消費者監(jiān)聽該隊列,那么該消息重回隊列后又會推送給該消費者,會造成死循環(huán)的情況。
消費者配置中,prefetch 屬性用于指定消費者每次從隊列獲取的消息數(shù)量。
每個 customer 會在 MQ 預(yù)取一些消息放入內(nèi)存的 LinkedBlockingQueue 中進行消費,這個值越高,消息傳遞的越快,但非順序處理消息的風險更高。如果 ack 模式為 none,則忽略。
prefetch 默認值以前是 1,這可能會導(dǎo)致高效使用者的利用率不足。從 spring-amqp 2.0 版開始,默認的 prefetch 值是 250,這將使消費者在大多數(shù)常見場景中保持忙碌,從而提高吞吐量。
不過在有些情況下,尤其是處理速度比較慢的大消息,消息可能在內(nèi)存中大量堆積,消耗大量內(nèi)存;以及對于一些嚴格要求順序的消息,prefetch 的值應(yīng)當設(shè)置為 1。
對于低容量消息和多個消費者的情況(也包括單 listener 容器的 concurrency 配置)希望在多個使用者之間實現(xiàn)更均勻的消息分布,建議在手動 ack 下并設(shè)置 prefetch=1。
如果要保證消息的可靠不丟失,當 prefetch 大于 1 時,可能會出現(xiàn)因為服務(wù)宕機引起的數(shù)據(jù)丟失,故建議將 prefetch=1。
消費者配置中,concurrency 屬性設(shè)置的是對每個 listener 在初始化的時候設(shè)置的并發(fā)消費者的個數(shù)。在上面的 yml 配置中,cnotallow=1,即每個 Listener 容器將開啟一個線程去處理消息。在 2.0 以后的版本中,可以在注解中配置該參數(shù),實例代碼如下,
@RabbitListener(queues = MQConstants.ORDER_DIRECT_QUEUE, concurrency = "2")public void process(Channel channel, Message message) throws IOException { String body = new String(message.getBody()); log.info("OrderPayConsumer 消費者收到消息: {}", body); ...}在 waynboot-mall 項目中,消費者監(jiān)聽隊列代碼如下,
@Slf4j@Componentpublic class OrderPayConsumer { @Resource private RedisCache redisCache; @Resource private MobileApi mobileApi; @RabbitListener(queues = MQConstants.ORDER_DIRECT_QUEUE) public void process(Channel channel, Message message) throws IOException { // 1. 轉(zhuǎn)換訂單消息 String body = new String(message.getBody()); log.info("OrderPayConsumer 消費者收到消息: {}", body); // 2. 獲取消息ID String msgId = message.getMessageProperties().getHeader("spring_returned_message_correlation"); // 3. 獲取發(fā)送tag long deliveryTag = message.getMessageProperties().getDeliveryTag(); // 4. 消費消息冪等性處理 if (redisCache.getCacheObject(ORDER_CONSUMER_MAP.getKey()) != null) { // redis中包含該 key,說明該消息已經(jīng)被消費過 log.error("msgId: {},消息已經(jīng)被消費", msgId); channel.basicAck(deliveryTag, false);// 確認消息已消費 return; } try { // 5. 下單處理 mobileApi.submitOrder(body); // 6. 手動ack,消息成功確認 channel.basicAck(deliveryTag, false); // 7. 設(shè)置消息已被消費標識 redisCache.setCacheObject(ORDER_CONSUMER_MAP.getKey(), msgId, ORDER_CONSUMER_MAP.getExpireSecond()); } catch (Exception e) { channel.basicNack(deliveryTag, false, false); log.error(e.getMessage(), e); } }}waynboot-mall 項目中在使用 RabbitListener 注解消費消息時,按照如下步驟,大家可以參考
其中參數(shù) long deliveryTag 為消息的唯一序號也就是第三步獲取的發(fā)送 tag,第二個 boolean multiple 參數(shù)表示是否一次消費多條消息,false 表示只確認該序列號對應(yīng)的消息,true 則表示確認該序列號對應(yīng)的消息以及比該序列號小的所有消息,比如我先發(fā)送 2 條消息,他們的序列號分別為 2,3,并且他們都沒有被確認,還留在隊列中,那么如果當前消息序列號為 4,那么當 multiple 為 true,則序列號為 2、3 的消息也會被一同確認。
這篇文章給大家講解了在 Spring Boot 項目中如何集成消息隊列 RabbitMQ 用于業(yè)務(wù)邏輯解耦,有架構(gòu)介紹、應(yīng)用場景、坑點解析、代碼實戰(zhàn) 4 個部分,能帶領(lǐng)大家比較全面的了解一波 RabbitMQ。大家在自己的項目中如果需要引入 RabbitMQ 時,都可以參考本文的代碼實戰(zhàn)配置,幫助大家快速集成、避免踩坑。
本文鏈接:http://m.www897cc.com/showinfo-26-70467-0.htmlSpring Boot項目集成RabbitMQ實戰(zhàn)以及坑點講解
聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。郵件:2376512515@qq.com
上一篇: 利用Nacos實現(xiàn)Seata事務(wù)模式(XA與AT)的快速配置與靈活切換
下一篇: 字節(jié)碼增強技術(shù),不止有 Java Proxy、 Cglib 和 Javassist 還有 Byte Buddy