日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當前位置:首頁 > 科技  > 軟件

完美解決,RocketMQ如何支持多事務消息?

來源: 責編: 時間:2024-02-04 17:24:39 295觀看
導讀今天我們將解決使用RocketMQ事務消息時可能遇到的一個常見問題:如何讓其支持多事務消息?1. 問題背景在實際開發中,我們常常會面臨多事務消息的場景,例如在DailyMart的訂單模塊中,用戶支付后需要調用庫存服務進行庫存扣減,而

今天我們將解決使用RocketMQ事務消息時可能遇到的一個常見問題:如何讓其支持多事務消息?gjm28資訊網——每日最新資訊28at.com

1. 問題背景

在實際開發中,我們常常會面臨多事務消息的場景,例如在DailyMart的訂單模塊中,用戶支付后需要調用庫存服務進行庫存扣減,而在訂單確認收貨后需要調用用戶服務實現積分贈送。這兩個業務邏輯都需要通過事務消息來保證分布式事務。gjm28資訊網——每日最新資訊28at.com

為了處理這種情況,我們可能會考慮在訂單模塊中創建兩個事務消息監聽器,分別用于處理庫存扣減和積分贈送的事務處理和事務回查。gjm28資訊網——每日最新資訊28at.com

@Component@Slf4j//處理訂單支付的事務監聽器public class OrderPaidTransactionListener implements RocketMQLocalTransactionListener {  @Override  public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {    ......    //處理訂單支付邏輯   }  @Override  public RocketMQLocalTransactionState checkLocalTransaction(Message message) {      ......      //檢查訂單處理邏輯   }}@Component@Slf4j//處理訂單收貨的事務監聽器public class OrderReceivedTransactionListener implements RocketMQLocalTransactionListener {  @Override  public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {    ......   }  @Override  public RocketMQLocalTransactionState checkLocalTransaction(Message message) {      ......   }}

然而,當我們信心滿滿地完成業務邏輯編寫并啟動服務時,可能會遇到如下錯誤:rocketMQTemplate already exists RocketMQLocalTransactionListenergjm28資訊網——每日最新資訊28at.com

圖片圖片gjm28資訊網——每日最新資訊28at.com

在rocketmq-spring-boot-starter版本低于2.1.0的項目中,可以使用多個 @RocketMQTransactionListener 監聽不同的 txProducerGroup 來發送不同類型的事務消息到topic。然而,從 RocketMQ-Spring 2.1.0 版本開始,注解 @RocketMQTransactionListener 不能設置 txProducerGroup、ak、sk,這些值均需與對應的 RocketMQTemplate 保持一致。通過閱讀源碼 RocketMQTransactionConfiguration#registerTransactionListener() 方法,也可得知在RocketMQ如果已經存在了 RocketMQTransactionListener 則會出現上述錯誤。gjm28資訊網——每日最新資訊28at.com

圖片圖片gjm28資訊網——每日最新資訊28at.com

2. 如何解決

為了在保證系統只有一個 RocketMQTransactionListener 的前提下實現多事務消息,我們可以將 RocketMQLocalTransactionListener 不處理具體業務邏輯,而是將其作為一個分發器使用。gjm28資訊網——每日最新資訊28at.com

在生產者發送事務消息時指定對應的事務處理器 ,并將事務處理器放置在消息頭上發送出去,在 RocketMQTransactionListener 中根據消息頭選擇具體的事務處理器來實現業務邏輯。gjm28資訊網——每日最新資訊28at.com

具體實現如下:gjm28資訊網——每日最新資訊28at.com

2.1 定義事務消息處理接口

首先,定義公共的事務消息處理接口,所有事務消息都實現此接口而非 RocketMQ 默認的 RocketMQLocalTransactionListener。gjm28資訊網——每日最新資訊28at.com

public interface TransactionMessageHandler {        /**    * 執行本地事務    * @param payload 消息體    * @param arg 參數    */    RocketMQLocalTransactionState executeLocalTransaction(Object payload, Object arg);        /**     * 檢查本地執行狀態     * @param payload 消息體     * @return 執行結果     */    RocketMQLocalTransactionState checkLocalTransaction(Object payload);    }

2.2 修改事務消息發送工具類,指定消息處理器

public <T extends RemoteDomainEvent> TransactionSendResult sendTransaction(String topic, String tag, T message, Class<? extends TransactionMessageHandler> transactionMessageListener) {    if(transactionMessageListener == null){    throw new IllegalArgumentException("transactionMessageListener must not null");  }    String destination = buildDestination(topic, tag);  Message<T> sendMessage = MessageBuilder.withPayload(message)    .setHeader(RocketMQHeaders.KEYS, message.getKey())    .setHeader(SOURCE_HEADER, message.getSource())    .setHeader(TRANSACTION_MESSAGE_HEADER, transactionMessageListener.getSimpleName())    .build();  TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, sendMessage, null);  log.info("[{}]事務消息[{}]發送結果[{}]", destination, JSONObject.toJSON(message),JSONObject.toJSON(sendResult));  return sendResult;}

2.3 修改RocketMQ事務消息監聽器

@Slf4j@RocketMQTransactionListenerpublic class DefaultRocketMQTransactionListener implements RocketMQLocalTransactionListener {        private final Map<String, TransactionMessageHandler> transactionMessageHandlerMap;        public DefaultRocketMQTransactionListener(Map<String, TransactionMessageHandler> transactionMessageHandlerMap) {        this.transactionMessageHandlerMap = transactionMessageHandlerMap;    }        @Override    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {        log.info("消費者收到事務消息[{}]", JSONObject.toJSON(message));        String listenerName = (String) message.getHeaders().get(MessageHeaderConstant.TRANSACTION_MESSAGE_HEADER);                if (null == listenerName) {            throw new RuntimeException("not params transactionMessageListener");        }                RocketMQLocalTransactionState state;        Object payload = message.getPayload();        try {            TransactionMessageHandler messageHandler = transactionMessageHandlerMap.get(listenerName);            if (null == messageHandler) {                throw new RuntimeException("not match condition TransactionMessageHandler");            }            state = messageHandler.executeLocalTransaction(payload, arg);        } catch (Exception e) {            log.error("rocket transaction message executeLocal error:{}", e.getMessage());            return RocketMQLocalTransactionState.ROLLBACK;        }                return state;    }        @Override    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {        log.info("消費者收到事務回查消息[{}]", JsonUtils.obj2String(message.getHeaders()));        String listenerName = (String) message.getHeaders().get(MessageHeaderConstant.TRANSACTION_MESSAGE_HEADER);        if (null == listenerName) {            throw new RuntimeException("not params transactionMessageListener");        }        RocketMQLocalTransactionState state;        try {            TransactionMessageHandler messageHandler = transactionMessageHandlerMap.get(listenerName);            if (null == messageHandler) {                throw new RuntimeException("not match condition TransactionMessageHandler");            }            state = messageHandler.checkLocalTransaction(message.getPayload());        } catch (Exception e) {            log.error("rocket transaction message executeLocal error:{}", e.getMessage());            return RocketMQLocalTransactionState.ROLLBACK;        }                return state;    }    }

在上述代碼中,根據消息頭中的TRANSACTION_MESSAGE_HEADER參數選擇對應的事務處理器來處理事務消息。gjm28資訊網——每日最新資訊28at.com

在 DailyMart 中有一個公共組件 dailymart-rocketmq-spring-boot-starter 專門用于 RocketMQ 消息發送監聽的封裝,因此我們也將事務消息的處理邏輯封裝到了此組件中。gjm28資訊網——每日最新資訊28at.com

圖片圖片gjm28資訊網——每日最新資訊28at.com

2.4 修改事務消息處理邏輯

所有的事務消息處理邏輯都實現 TransactionMessageHandler 接口,以訂單支付的處理邏輯為例:gjm28資訊網——每日最新資訊28at.com

@Component@Slf4jpublic class OrderPaidTransactionConsumer implements TransactionMessageHandler {        @Resource    private TransactionTemplate transactionTemplate;            @Override    public RocketMQLocalTransactionState executeLocalTransaction(Object payload, Object arg) {        final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) payload, OrderPaidEvent.class);        ...    }        @Override    public RocketMQLocalTransactionState checkLocalTransaction(Object payload) {        final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) payload, OrderPaidEvent.class);        ...    }    }

2.5 修改事務消息發送邏輯,指定事務處理器

TransactionSendResult sendResult = enhanceTemplate.sendTransaction("TRADE-ORDER", "ORDER-PAID", orderPaidEvent, OrderPaidTransactionConsumer.class);

小結

本文解決了在 RocketMQ 2.1.0 版本以后,無法簡單使用多個 @RocketMQTransactionListener 的問題。通過引入事務消息處理接口 TransactionMessageHandler,我們將原有的事務處理器改造成了一個分發器,使得在 DailyMart 項目中可以輕松處理多事務消息的場景。gjm28資訊網——每日最新資訊28at.com

本文鏈接:http://m.www897cc.com/showinfo-26-73332-0.html完美解決,RocketMQ如何支持多事務消息?

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: .NET 中優秀依賴注入框架Autofac看一篇就夠了

下一篇: 一場「跨時空」的小年夜直播:以數字技術助力戲曲「煥活」傳承

標簽:
  • 熱門焦點
Top 日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不
国产精品久久久久毛片大屁完整版 | 在线成人激情黄色| 激情国产一区| 亚洲人妖在线| 亚洲一区二区不卡免费| 久久久亚洲影院你懂的| 欧美日韩国产欧| 国产欧美一区二区视频| 亚洲国产裸拍裸体视频在线观看乱了中文| 日韩午夜av电影| 欧美一区影院| 欧美精品亚洲一区二区在线播放| 国产精品视频| 亚洲国产精品va在线观看黑人 | 亚洲欧美日韩一区二区三区在线观看 | 欧美成人免费全部观看天天性色| 欧美日韩国产免费| 国产亚洲亚洲| 亚洲免费福利视频| 久久国产精品久久久久久| 欧美激情一区三区| 国产一区在线播放| av不卡免费看| 久久综合色播五月| 国产伦精品一区二区三| 亚洲欧洲一区二区在线观看| 欧美一区二区三区免费在线看| 欧美福利一区二区| 国产午夜精品在线观看| 一区二区三区蜜桃网| 久久野战av| 国产欧美日韩在线视频| 日韩一级在线| 毛片一区二区| 国产色视频一区| 日韩小视频在线观看| 久久国产精品久久国产精品| 欧美日韩精品| 亚洲国产日韩一区二区| 久久黄色小说| 国产精品免费aⅴ片在线观看| 91久久精品一区二区别| 久久久xxx| 国产女人aaa级久久久级| 一本色道久久88精品综合| 老司机精品久久| 国产一区二区三区自拍| 午夜精品一区二区三区在线| 欧美日韩视频在线第一区| 亚洲高清资源综合久久精品| 久久精品中文字幕一区| 国产欧美va欧美不卡在线| 宅男精品视频| 欧美久久综合| 亚洲精品1区2区| 久久青草欧美一区二区三区| 国产乱子伦一区二区三区国色天香| 99这里有精品| 欧美亚洲视频| 欧美国产三级| 国产精品日韩精品| 久久福利资源站| 久久综合中文色婷婷| 国产精品香蕉在线观看| 一本综合精品| 欧美伦理91i| 亚洲日本一区二区| 你懂的视频一区二区| 狠狠久久五月精品中文字幕| 欧美一区在线看| 国产免费成人av| 亚洲欧美日韩在线观看a三区| 欧美视频中文字幕| 国产精品99久久久久久宅男 | 亚洲国产福利在线| 狂野欧美激情性xxxx欧美| 国内精品视频666| 久久精品免费播放| 国产伊人精品| 久久精品综合| 永久91嫩草亚洲精品人人| 久久蜜臀精品av| 在线免费观看视频一区| 麻豆精品视频| 在线精品视频免费观看| 免费观看在线综合| 亚洲激情一区二区| 欧美激情影音先锋| 一区二区三区不卡视频在线观看| 欧美精品激情blacked18| 日韩亚洲在线观看| 国产精品成人v| 亚洲欧美成人在线| 国产日韩欧美精品在线| 久久精选视频| 亚洲国产成人高清精品| 欧美精品成人91久久久久久久| 亚洲精品日韩在线| 欧美性大战xxxxx久久久| 亚洲欧美日韩精品在线| 国产人成精品一区二区三| 久久久久国产精品一区| 亚洲高清视频在线| 欧美日韩精品免费| 亚洲欧美偷拍卡通变态| 国产一区二区三区直播精品电影| 久久人人97超碰国产公开结果 | 韩国一区二区三区在线观看| 美脚丝袜一区二区三区在线观看| 亚洲人成毛片在线播放| 欧美午夜片欧美片在线观看| 亚洲欧美综合网| 国产最新精品精品你懂的| 免费日韩成人| 亚洲视频在线观看视频| 国产日韩欧美黄色| 欧美成人亚洲成人日韩成人| 一本色道久久综合亚洲精品按摩| 国产精品久久国产三级国电话系列| 欧美高清在线播放| 国产精品99久久久久久久女警| 欧美日韩综合视频网址| 久久久一区二区| 亚洲国产欧美日韩精品| 欧美日韩在线电影| 性色av一区二区三区| 在线观看91精品国产麻豆| 欧美精品一区二区视频| 性欧美长视频| 亚洲欧洲精品一区| 国产精品欧美日韩久久| 麻豆9191精品国产| 亚洲一区三区视频在线观看| 在线成人中文字幕| 国产精品wwwwww| 狼人天天伊人久久| 亚洲影院色无极综合| 又紧又大又爽精品一区二区| 欧美私人网站| 另类春色校园亚洲| 亚洲免费视频一区二区| 亚洲国产精品99久久久久久久久| 亚洲影院免费观看| 在线观看国产一区二区| 国产精品va在线| 美女网站在线免费欧美精品| 亚洲网站在线观看| 亚洲电影免费观看高清完整版在线 | 久久av在线看| 亚洲乱码国产乱码精品精可以看| 国产日韩欧美在线看| 欧美日产国产成人免费图片| 欧美影院成人| 久久天天综合| 欧美日韩中文字幕日韩欧美| 免费在线欧美黄色| 亚洲综合色视频| 99精品视频免费| 国产精品九九| 狂野欧美一区| 久久免费视频在线| 亚洲一级影院| 在线综合亚洲欧美在线视频| 亚洲国产高清一区| 欧美极品影院| 久久这里只有精品视频首页| 欧美一级日韩一级| 久久尤物视频| 国产精品入口福利| 亚洲欧美综合网| 亚洲电影免费在线观看| 国产精品影视天天线| 欧美日韩国产成人在线91| 久久久久久久尹人综合网亚洲| 亚洲无限av看| avtt综合网| 久久激情网站| 国产精品日日摸夜夜添夜夜av| 在线免费观看欧美| 欧美在线网站| 夜夜夜久久久| 99国产精品99久久久久久粉嫩| 亚洲精品一级| 亚洲欧美激情诱惑| 亚洲欧美日韩一区二区| 在线视频免费在线观看一区二区| 亚洲国产一区二区a毛片| 欧美日韩中文字幕在线| 国产视频自拍一区| 亚洲大片在线| 亚洲精品国产精品国自产在线 | 91久久精品国产91久久性色| 国产一区在线看| 国产精品一二三四区| 欧美午夜在线| 欧美性做爰毛片| 欧美性事在线| 亚洲精选中文字幕| 欧美色区777第一页| 黄色日韩精品| 国内精品嫩模av私拍在线观看| 亚洲国产另类 国产精品国产免费| 亚洲另类一区二区| 亚洲私拍自拍|