日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當前位置:首頁 > 科技  > 軟件

實戰與原理:如何基于RocketMQ實現分布式事務?

來源: 責編: 時間:2024-01-26 08:59:37 289觀看
導讀使用事務消息在DailyMart系統中,用戶發起支付后,訂單系統需要調用庫存服務執行庫存扣減邏輯。由于這是跨服務調用,因此會產生分布式事務。在這里,我們使用RocketMQ的事務消息來實現分布式事務。1、首先,在訂單服務的應用服

使用事務消息

在DailyMart系統中,用戶發起支付后,訂單系統需要調用庫存服務執行庫存扣減邏輯。圖片aJs28資訊網——每日最新資訊28at.com

由于這是跨服務調用,因此會產生分布式事務。在這里,我們使用RocketMQ的事務消息來實現分布式事務。aJs28資訊網——每日最新資訊28at.com

1、首先,在訂單服務的應用服務層處理支付邏輯,并調用RocketMQ發送事務消息:aJs28資訊網——每日最新資訊28at.com

@Overridepublic String payment(String orderSn) {    // todo 集成支付寶支付    // 支付流水號    String outOrderNo = IdUtils.get32UUID();    TradeOrder tradeOrder = Optional.ofNullable(tradeOrderService.getByOrderSn(orderSn)).orElseThrow(() -> new BusinessException("訂單編號不存在"));    // 如果訂單處于待支付狀態    if (Objects.equals(tradeOrder.getStatus(), OrderStatusEnum.WAITING_PAYMENT.getStatus())) {        OrderPaidEvent orderPaidEvent = new OrderPaidEvent(orderSn, outOrderNo);        TransactionSendResult sendResult = enhanceTemplate.sendTransaction("TRADE-ORDER", "ORDER-PAID");        if (SendStatus.SEND_OK == sendResult.getSendStatus() && sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {            return tradeOrder.getOrderSn();        } else {            throw new BusinessException("支付失敗...");        }    } else {        throw new BusinessException("訂單已支付,請勿重復提交...");    }}

aJs28資訊網——每日最新資訊28at.com

2、在訂單服務的基礎設施層,創建一個類實現 RocketMQLocalTransactionListener 接口:aJs28資訊網——每日最新資訊28at.com

該接口有兩個方法:aJs28資訊網——每日最新資訊28at.com

  • executeLocalTransaction:用于執行本地事務。
  • checkLocalTransaction:在RocketMQ執行消息回查時檢查本地事務執行結果,用于確定消息提交還是回滾。
@Component@Slf4jpublic class OrderPaidTransactionConsumer implements RocketMQLocalTransactionListener {        @Resource    private TransactionTemplate transactionTemplate;    @Resource    private TradeOrderService tradeOrderService;         /**     * 執行本地事務     * 將訂單狀態修改成已支付     */    @Override    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {                final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) message.getPayload(), OrderPaidEvent.class);        try {            // 放到同一個本地事務中            this.transactionTemplate.executeWithoutResult(status -> {                String orderSn = orderPaidEvent.getOrderSn();                // 修改成待發貨                tradeOrderService.changeOrderStatus(orderSn, OrderStatusEnum.AWAITING_SHIPMENT);            });            return RocketMQLocalTransactionState.COMMIT;        } catch (Exception e) {            log.error("修改訂單狀態失敗", e);            // ROLLBACK 則回滾消息,rocketmq將廢棄這條消息            return RocketMQLocalTransactionState.ROLLBACK;            // 如果是UNKNOWN, 則觸發回查        }    }    /**     * 檢查本地事務執行狀態     * 消息回查時,對于正在進行中的事務不要返回Rollback或Commit結果,應繼續保持Unknown的狀態。     */    @Override    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {        final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) message.getPayload(), OrderPaidEvent.class);        String orderSn = orderPaidEvent.getOrderSn();        TradeOrder tradeOrder = tradeOrderService.getByOrderSn(orderSn);        // 如果已經修改成待發貨說明本地事務執行成功,此時消費端可以直接消費        if (Objects.equals(tradeOrder.getStatus(), OrderStatusEnum.AWAITING_SHIPMENT.getStatus())) {            return RocketMQLocalTransactionState.COMMIT;        } else {            // 這里查不到的時候返回 UNKNOWN在于,有可能事務還沒有提交,回查就開始了            return RocketMQLocalTransactionState.UNKNOWN;        }    }}

aJs28資訊網——每日最新資訊28at.com

3、在庫存服務的基礎設施層,監聽消息以執行庫存扣減邏輯:aJs28資訊網——每日最新資訊28at.com

@Component@Slf4j@RocketMQMessageListener(consumerGroup = "dailymart_inventory_group", topic = "TRADE-ORDER", selectorExpression = "ORDER-PAID")public class InventoryDeductionConsumer extends EnhanceMessageHandler<OrderPaidEvent> implements RocketMQListener<OrderPaidEvent> {        @Resource    private InventoryDomainService inventoryDomainService;        @Override    public void onMessage(OrderPaidEvent orderPaidEvent) {        super.dispatchMessage(orderPaidEvent);    }        @Override    protected void handleMessage(OrderPaidEvent orderPaidEvent) throws Exception {        // 執行庫存扣減邏輯        String orderSn = orderPaidEvent.getOrderSn();        inventoryDomainService.deductionInventory(orderSn);    }}

通過以上步驟,我們完成了RocketMQ事務消息的發送,利用事務消息的特性保證分布式事務的最終一致性。與普通消息相比,事務消息在處理時需要實現 RocketMQLocalTransactionListener 接口,這是事務消息的核心。aJs28資訊網——每日最新資訊28at.com

介紹完事務消息的使用,接下來我們再來聊聊事務消息的原理。aJs28資訊網——每日最新資訊28at.com

事務消息的原理

首先,讓我們思考一下,如果不使用事務消息會有什么問題。aJs28資訊網——每日最新資訊28at.com

很容易想到的一個問題就是消息丟失。當保存訂單后由于網絡問題導致消息丟失,如下圖所示:aJs28資訊網——每日最新資訊28at.com

圖片圖片aJs28資訊網——每日最新資訊28at.com

在不使用RocketMQ的情況下,我們往往會通過 本地消息表 + 補償重試 的機制來保證消息一定會發送出去。其原理可以參考上篇文章 [Dailymart26:微服務中躲不過的坑 - 分布式事務]。aJs28資訊網——每日最新資訊28at.com

那RocketMQ是如何解決這個問題的呢?aJs28資訊網——每日最新資訊28at.com

1. 發送half消息,探測MQ是否正常

在基于RocketMQ的事務消息中,我們不是先執行自身的訂單支付邏輯,而是先讓訂單系統發送一條 half消息 到MQ去。這個half消息本質上是一個訂單支付成功的消息,只不過此時庫存系統是看不見這個half消息的。然后,我們等待接收這個half消息寫入成功的響應通知。aJs28資訊網——每日最新資訊28at.com

圖片圖片aJs28資訊網——每日最新資訊28at.com

發送half消息的本質其實是為了探測MQ是否仍然正常運行。但問題來了,如上所述,消息會發生丟失,那么half消息丟失怎么辦呢?aJs28資訊網——每日最新資訊28at.com

2. half消息發送失敗

在發送half消息時,由于網絡原因或者MQ直接掛了,就會導致half消息發送失敗。這個時候訂單系統需要執行一系列的回滾操作。在我們的場景中,應該執行退款操作,將錢退還給用戶,并告知用戶交易失敗。aJs28資訊網——每日最新資訊28at.com

3. half消息成功,訂單系統執行自己的業務邏輯

如果成功收到half消息的正常響應,此時訂單系統應該執行自己的業務邏輯。在我們這個場景中,就是修改訂單數據庫狀態,將其修改為待發貨狀態。這部分邏輯就對應上述代碼中的executeLocalTransaction()方法。aJs28資訊網——每日最新資訊28at.com

圖片圖片aJs28資訊網——每日最新資訊28at.com

4. 訂單本地事務執行失敗

如果訂單系統執行本地事務失敗,則需要發送一個rollback請求給MQ,讓其刪除這條half消息。aJs28資訊網——每日最新資訊28at.com

圖片圖片aJs28資訊網——每日最新資訊28at.com

5. 訂單本地事務執行成功

如果訂單系統的本地事務執行正常,此時需要發送一個commit請求給MQ,要求MQ對之前的half消息進行commit操作,這樣庫存系統就可以消費這條消息了。aJs28資訊網——每日最新資訊28at.com

圖片圖片aJs28資訊網——每日最新資訊28at.com

訂單創建消息處于half狀態時,庫存系統是看不見它的。必須等到訂單系統執行commit請求,消息被commit后,庫存系統才能看到并獲取這條消息進行后續處理。aJs28資訊網——每日最新資訊28at.com

6. half消息發送成功,但是沒收到half的響應

以上就是RocketMQ事務消息的正向流程。aJs28資訊網——每日最新資訊28at.com

然而,還有一個問題:如果訂單系統發送half消息成功后卻沒有收到half消息的響應,該如何處理呢?aJs28資訊網——每日最新資訊28at.com

在這種情況下,訂單系統可能會誤以為是發送half消息到MQ失敗了。訂單系統就會執行回滾流程,退還支付金額,關閉訂單。aJs28資訊網——每日最新資訊28at.com

圖片圖片aJs28資訊網——每日最新資訊28at.com

然而,此時MQ系統中已經存在了一條half消息。這條half消息又該如何處理呢?aJs28資訊網——每日最新資訊28at.com

在RocketMQ中,有一套補償流程。RocketMQ會定期掃描處于half狀態的消息。如果一直沒有對這個消息執行 commit/rollback 操作,超過了一定的時間,RocketMQ就會回調你的訂單系統的一個接口,用以確認你本地事務的情況。aJs28資訊網——每日最新資訊28at.com

當訂單系統收到MQ的回查請求時,就需要檢索一下數據庫,根據訂單狀態決定執行commit還是rollback。aJs28資訊網——每日最新資訊28at.com

這部分邏輯就對應上述代碼中checkLocalTransaction()方法。aJs28資訊網——每日最新資訊28at.com

圖片圖片aJs28資訊網——每日最新資訊28at.com

7. rollback 或者 commit 失敗怎么辦?

通過上述說明,可以看到,RocketMQ是根據rollback或commit操作來決定half消息的狀態的。如果業務系統執行了commit操作,則將half消息設置為可見,庫存系統可以消費;如果業務系統執行了rollback操作,MQ就會刪除half消息。那么問題來了:如果訂單系統在執行rollback或commit操作時失敗又該如何處理呢?aJs28資訊網——每日最新資訊28at.com

這時候仍然依賴于前文提到的回查機制。aJs28資訊網——每日最新資訊28at.com

由于此時MQ中的消息一直處于half狀態,超過一定的超時時間后,MQ會發現這個half消息有問題,然后回調你的訂單系統的接口。此時訂單系統需要根據訂單狀態來決定執行commit請求還是rollback請求。aJs28資訊網——每日最新資訊28at.com

以上,就是RocketMQ事務消息的原理。結合文章開頭的代碼,是不是已經很清晰了呢?aJs28資訊網——每日最新資訊28at.com

本文鏈接:http://m.www897cc.com/showinfo-26-68319-0.html實戰與原理:如何基于RocketMQ實現分布式事務?

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: Python Pathlib模塊:一站式解決文件路徑難題

下一篇: 我愛說實話,Mica-Http 超好用!

標簽:
  • 熱門焦點
Top 日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不
美腿丝袜亚洲色图| 亚洲欧美电影在线观看| 一区二区视频欧美| 亚洲精品国产欧美| 亚洲天堂网站在线观看视频| 欧美在线一区二区三区| 蜜桃av综合| 欧美欧美午夜aⅴ在线观看| 国产精品高潮视频| 经典三级久久| 亚洲天堂成人在线观看| 久久精品视频在线看| 欧美久久精品午夜青青大伊人| 国产精品捆绑调教| 在线免费观看日本欧美| 在线视频欧美日韩精品| 欧美制服丝袜| 欧美日韩www| 国产真实久久| 夜夜嗨av一区二区三区| 久久久亚洲一区| 欧美日韩一区二区三区免费看| 国产亚洲aⅴaaaaaa毛片| 亚洲精品在线免费观看视频| 欧美一级欧美一级在线播放| 欧美成人亚洲成人| 国产日韩在线视频| 一区二区三区视频在线看| 久久久久久高潮国产精品视| 欧美日韩在线一区二区| 亚洲大片在线观看| 羞羞视频在线观看欧美| 欧美日韩精品欧美日韩精品一| 国产在线欧美| 一区二区三区视频观看| 另类av导航| 国产视频在线观看一区二区三区| 日韩视频在线播放| 欧美日韩一区二区在线视频| 国产欧美日本| 一区二区三区高清不卡| 免费久久99精品国产自| 国产欧美一区二区三区沐欲| 日韩写真在线| 欧美成人一区二免费视频软件| 国产欧美日韩中文字幕在线| 一区二区三区精品国产| 欧美成在线观看| 激情欧美一区| 欧美专区亚洲专区| 国产精品久久二区二区| 99精品欧美一区| 欧美国产三区| 亚洲高清在线| 久久综合久久美利坚合众国| 国内精品免费午夜毛片| 亚洲欧美一区二区精品久久久| 欧美日韩一区二区三区| 亚洲精品乱码| 欧美成人一品| 91久久久国产精品| 奶水喷射视频一区| 影音先锋亚洲视频| 久久免费一区| 一区免费观看视频| 久久久午夜电影| 激情成人av| 久久久夜夜夜| 国内精品久久久久久久97牛牛| 欧美有码视频| 国产亚洲欧美一区二区| 久久国产精品色婷婷| 国产一区高清视频| 欧美在线视频免费观看| 国产欧美一区二区精品性色| 亚洲午夜羞羞片| 国产精品国产三级国产aⅴ9色| 亚洲一区3d动漫同人无遮挡| 国产精品久久久久久久久果冻传媒| 在线视频日韩| 国产精品久久久久久av下载红粉| 亚洲天堂av图片| 国产精品欧美日韩久久| 午夜亚洲精品| 黄色日韩网站| 美女精品在线| 亚洲日本中文字幕| 欧美人与禽猛交乱配视频| 日韩视频在线你懂得| 欧美揉bbbbb揉bbbbb| 亚洲无线观看| 国产九区一区在线| 欧美一区午夜精品| 一色屋精品视频免费看| 欧美不卡福利| 99热在线精品观看| 国产精品视频一二三| 久久国产欧美日韩精品| 原创国产精品91| 欧美精品一区二区三区很污很色的| 9国产精品视频| 国产精品免费aⅴ片在线观看| 欧美一区免费视频| 亚洲电影免费在线观看| 欧美精品一区二区在线播放| 在线一区二区视频| 国产欧美欧美| 欧美成人精品在线播放| 夜夜嗨网站十八久久| 国产精品一区二区在线观看网站 | 久久精品亚洲一区二区| 在线欧美亚洲| 欧美日韩国产色视频| 亚洲欧美日韩高清| 伊人精品在线| 欧美日韩在线播| 欧美一激情一区二区三区| 精品成人免费| 欧美日韩精品二区| 欧美一区二区三区四区夜夜大片| 亚洲国产高清在线观看视频| 欧美日韩国产另类不卡| 欧美在线高清| 亚洲日本中文字幕| 国产精品手机视频| 麻豆国产va免费精品高清在线| 99在线|亚洲一区二区| 国产日韩久久| 欧美国产视频日韩| 亚洲欧美一区二区精品久久久| 亚洲国产1区| 国产精品老牛| 欧美福利在线观看| 小黄鸭精品aⅴ导航网站入口| 亚洲国产欧美在线人成| 国产乱肥老妇国产一区二| 欧美大色视频| 性欧美暴力猛交另类hd| 亚洲韩国精品一区| 国产精品自拍在线| 欧美日韩不卡在线| 久久人人爽国产| 亚洲影院色无极综合| 亚洲国产精品尤物yw在线观看| 国产精品男人爽免费视频1| 麻豆成人91精品二区三区| 亚洲欧美日韩综合aⅴ视频| 亚洲狠狠丁香婷婷综合久久久| 国产欧美日韩视频在线观看 | 亚洲一级二级在线| 在线观看一区视频| 国产精品美女主播在线观看纯欲| 免费观看久久久4p| 羞羞漫画18久久大片| 日韩亚洲欧美成人一区| 极品少妇一区二区三区| 国产精品免费看| 欧美剧在线观看| 麻豆九一精品爱看视频在线观看免费| 亚洲一区二区三区精品视频| 亚洲精品国产无天堂网2021| 国内免费精品永久在线视频| 国产精品www网站| 欧美老女人xx| 蜜臀av性久久久久蜜臀aⅴ四虎| 欧美在线视频不卡| 亚洲免费婷婷| 一区二区av在线| 亚洲精品国产精品国产自| 狠狠色2019综合网| 国产欧美日韩免费| 国产精品看片资源| 欧美日韩综合网| 欧美国产日韩在线观看| 麻豆国产精品一区二区三区 | 欧美人体xx| 欧美激情视频在线播放| 开元免费观看欧美电视剧网站| 欧美一进一出视频| 亚洲免费一级电影| 亚洲一二三四区| 在线综合亚洲| 中文一区在线| 一区二区三区欧美日韩| 亚洲美女视频在线观看| 亚洲全黄一级网站| 亚洲成人在线观看视频| 狠狠狠色丁香婷婷综合久久五月 | 欧美高清视频一区| 欧美成人小视频| 欧美国产亚洲视频| 欧美成人按摩| 欧美va天堂va视频va在线| 狂野欧美激情性xxxx| 久久久亚洲国产美女国产盗摄| 久久久久久高潮国产精品视| 久久精品亚洲| 久久久久久9| 久久超碰97人人做人人爱| 久久国产精彩视频| 久久久久国色av免费观看性色| 久久精品最新地址| 久久人体大胆视频| 免费亚洲一区二区|