??先來(lái)看一個(gè)日常生活快遞寄件場(chǎng)景,從寄件人(寄件)到收件人(收件),全流程如下:
圖片
當(dāng)你準(zhǔn)備寄送一個(gè)包裹時(shí),通常你可以有兩種寄件方式:
??方案一、你親自前往快遞服務(wù)點(diǎn),填寫寄件單、交付包裹、等待工作人員處理,最后得到一張寄送單據(jù)。你必須在服務(wù)點(diǎn)等待直到所有步驟都完成。這個(gè)過(guò)程是同步的。
??方案二、你可以選擇在線預(yù)約快遞上門取件服務(wù),填寫相關(guān)信息后,你的請(qǐng)求就被提交給系統(tǒng)。此時(shí),你可以繼續(xù)進(jìn)行其他事情,而不需要等待快遞員到達(dá)。系統(tǒng)會(huì)在后臺(tái)異步處理你的請(qǐng)求,安排合適的快遞員前來(lái)取件。這樣,你就可以在等待的過(guò)程中做其他事情,無(wú)需阻塞在快遞服務(wù)點(diǎn)。
??這種寄件方式提高了效率,讓用戶可以更加靈活地安排自己的時(shí)間。在后臺(tái)系統(tǒng)中,快遞公司可以通過(guò)合理的任務(wù)調(diào)度,處理多個(gè)異步請(qǐng)求,提高寄件服務(wù)的整體吞吐量。這種方式類似于在后端異步處理任務(wù),而用戶無(wú)需等待任務(wù)完成,可以繼續(xù)進(jìn)行其他操作,提高了整個(gè)寄件過(guò)程的并發(fā)性和響應(yīng)性。這個(gè)過(guò)程就是異步。
我們通過(guò)這個(gè)例子抽象出同步模型和異步模型:
圖片
同步模型:一個(gè)任務(wù)做完做下一個(gè)任務(wù),阻塞
異步模型:做當(dāng)前任務(wù),只需要開啟而不需要關(guān)心另一個(gè)任務(wù)如何執(zhí)行,非阻塞
??有了上邊的模型,對(duì)于同步和異步的概念就有了初步的認(rèn)識(shí)。事實(shí)上,在架構(gòu)設(shè)計(jì)中,異步思想是指通過(guò)異步處理來(lái)提高系統(tǒng)的性能、可伸縮性和響應(yīng)速度。
以下是SpringColud微服務(wù)架構(gòu)的基本套件:
圖片
在架構(gòu)設(shè)計(jì)中,異步思想可以應(yīng)用在多個(gè)方面。常見(jiàn)的異步實(shí)踐包括:
......
??接下來(lái),我們針對(duì)實(shí)際項(xiàng)目中的異步設(shè)計(jì)逐個(gè)探究??赡茏霾坏矫婷婢愕?,但是可以為真實(shí)的場(chǎng)景中的方案設(shè)計(jì)打開思路。
??Spring Cloud Gateway基于Project Reactor反應(yīng)式編程和WebFlux框架,通過(guò)路由、過(guò)濾器、事件等機(jī)制實(shí)現(xiàn)了靈活的網(wǎng)關(guān)服務(wù)。它適用于構(gòu)建微服務(wù)架構(gòu)中的業(yè)務(wù)網(wǎng)關(guān),具有高性能、可擴(kuò)展性和豐富的功能。
官網(wǎng)地址:https://spring.io/projects/spring-cloud-gateway/
圖片
性能比較
??對(duì) Zuul/Spring Cloud Gateway 的一些性能分析可以參考 Spring Cloud Gateway 作者 Spencer Gibb 提供的項(xiàng)目:https://github.com/spencergibb/spring-cloud-gateway-bench。
圖片
摘自SpringCloud GateWay作者spencergibb 提供的一個(gè)壓測(cè)報(bào)告
??總的來(lái)說(shuō),Gateway在處理IO密集型請(qǐng)求場(chǎng)景下有著更大的優(yōu)勢(shì)。原因是: 隨著Spring 5 推出的WebFlux,它是完全異步且非阻塞的,底層也是基于Netty實(shí)現(xiàn)的。我們分別對(duì)Reactor模型和Netty做一個(gè)簡(jiǎn)單介紹。
圖片
??其中:mainReactor主要負(fù)責(zé)連接處理(不參與數(shù)據(jù)處理),而subReactor負(fù)責(zé)數(shù)據(jù)的讀?。ú粎⑴c連接). 不再是單線程模型那樣,接收請(qǐng)求和處理數(shù)據(jù)都是在一個(gè)Reactor下進(jìn)行。
核心主要是基于NIO的Netty框架,原理說(shuō)明如下:
組件關(guān)系:
圖片
概念說(shuō)明:
每個(gè)服務(wù)器中都會(huì)有一個(gè) Boss(老板),會(huì)有一群做事情的WorkerBoss(員工) 會(huì)不停地接收新的連接,將連接分配給一個(gè)個(gè) Worker 處理連接
執(zhí)行過(guò)程:
圖片
Netty 執(zhí)行過(guò)程:
關(guān)于SpringCloud GateWay的使用,請(qǐng)自行查閱官網(wǎng)。這里只介紹如何體現(xiàn)NIO異步非阻塞原理的。
場(chǎng)景分析:
??比如:商城首頁(yè)菜單樹。一般這種場(chǎng)景我們?cè)试S在一定時(shí)間數(shù)據(jù)不一致性。那么就可以使用定時(shí)任務(wù)+消息隊(duì)列。如每隔5分鐘同步一次,達(dá)到數(shù)據(jù)最終一致。
圖片
注意事項(xiàng):
??這種數(shù)據(jù)同步方案主要適用于數(shù)據(jù)實(shí)時(shí)性要求不高的場(chǎng)景,因?yàn)椋憾〞r(shí)任務(wù)處理存在一定時(shí)間間隔,會(huì)有同步延時(shí)。同時(shí)在時(shí)間窗口期數(shù)據(jù)可能發(fā)生變更。還有就是數(shù)據(jù)最終一致性的保證,主要取決于MQ的可靠性。
場(chǎng)景分析:
??三方平臺(tái)交互,上游系統(tǒng)(A)的數(shù)據(jù)和下游系統(tǒng)(B)的數(shù)據(jù)進(jìn)行接口規(guī)范轉(zhuǎn)化。此處可能涉及到很多業(yè)務(wù)轉(zhuǎn)到同一個(gè)平臺(tái)或者不同平臺(tái)。而我們接口轉(zhuǎn)化的功能是一致的。當(dāng)然你可以使用Feign直接調(diào)用。但是流量增加、網(wǎng)絡(luò)阻塞時(shí)可能會(huì)出現(xiàn)調(diào)用失敗,導(dǎo)致未能成功送達(dá)下游。因此我們可以這樣設(shè)計(jì):

注意事項(xiàng):
??這種異步設(shè)計(jì)一方面為了系統(tǒng)內(nèi)部服務(wù)之間解耦,另一方面起到了削峰填谷的作用。但是引入消息隊(duì)列和轉(zhuǎn)化服務(wù),增加了系統(tǒng)的復(fù)雜性。因?yàn)殒溌份^長(zhǎng),出現(xiàn)問(wèn)題時(shí)排查起來(lái)比較困難。因此要在數(shù)據(jù)庫(kù)中盡可能存留記錄明細(xì),方便審查。另外,也可能出現(xiàn)消息積壓等問(wèn)題。當(dāng)然這是消息隊(duì)列存在的共性問(wèn)題。
場(chǎng)景分析:
??日常我們會(huì)遇到很多這種發(fā)短信的情況。比如,
......
那么對(duì)于短信場(chǎng)景,我們?nèi)绾卧O(shè)計(jì)呢?
圖片
注意事項(xiàng):這種異步設(shè)計(jì)一方面為了將發(fā)送短信的功能獨(dú)立出來(lái)。
場(chǎng)景分析:
??在業(yè)務(wù)系統(tǒng)中,一般我們會(huì)進(jìn)行日志采集和可視化展示。ELK 是由 Elasticsearch、Logstash 和 Kibana 組成的一套日志管理和分析解決方案。結(jié)合 Kafka 使用時(shí),通常用于搭建一個(gè)高效的日志處理系統(tǒng)。
圖片
ELK 工作流程并結(jié)合 Kafka 的工作流程描述:
Logstash 作為 Kafka 消費(fèi)者,通過(guò) Kafka Input 插件訂閱一個(gè)或多個(gè) Kafka 主題。
Logstash 接收到 Kafka 中的日志消息后,可以進(jìn)行多種操作,如解析日志、添加字段、過(guò)濾、轉(zhuǎn)換格式等。
Logstash 處理日志并發(fā)送到 Elasticsearch:
Logstash 通過(guò) Elasticsearch Output 插件將處理后的日志數(shù)據(jù)發(fā)送到 Elasticsearch 集群。
Logstash 可以將日志數(shù)據(jù)根據(jù)配置的索引模式(Index Pattern)劃分到不同的索引中,以便更好地管理和查詢。
Elasticsearch 存儲(chǔ)和索引日志數(shù)據(jù):
Elasticsearch 接收 Logstash 發(fā)送過(guò)來(lái)的日志數(shù)據(jù),并將其存儲(chǔ)在分布式索引中。
Elasticsearch 提供了強(qiáng)大的全文搜索和分析功能,支持對(duì)大量的日志數(shù)據(jù)進(jìn)行高效的查詢和分析。
Kibana 可視化和查詢:
Kibana 作為 Elasticsearch 的前端界面,提供了豐富的可視化工具和查詢界面。
用戶可以使用 Kibana 創(chuàng)建儀表板、圖表,執(zhí)行復(fù)雜的查詢,實(shí)時(shí)監(jiān)控日志數(shù)據(jù)等。
整個(gè)工作流程如下:
+----------------------+ +----------------------+ +----------------------+| Producer | ----> | Kafka | ----> | Logstash | | (Log Generator) | | (Message Broker) | | | +----------------------+ +----------------------+ +----------+-----------+ | | v +----------------------+ | Elasticsearch | | (Log Storage) | +----------------------+ | | v +----------------------+ | Kibana | | (Visualization Tool)| +----------------------+注意事項(xiàng):
??整個(gè) ELK + Kafka 的架構(gòu)可以幫助實(shí)現(xiàn)高效的日志收集、處理和可視化,適用于大規(guī)模分布式系統(tǒng)中的日志管理。
??當(dāng)使用多線程和 CompletableFuture 來(lái)執(zhí)行批處理任務(wù)時(shí),可以通過(guò)將任務(wù)分成多個(gè)子任務(wù),并使用 CompletableFuture 來(lái)異步執(zhí)行這些子任務(wù)。主要思想如下:
圖片
假設(shè)我們有一個(gè)批處理任務(wù),需要對(duì)一組數(shù)據(jù)進(jìn)行處理:
import java.util.ArrayList;import java.util.List;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.Executors;import java.util.concurrent.ThreadPoolExecutor;public class BatchProcessingExample { public static void main(String[] args) { // 模擬一組數(shù)據(jù) List<Integer> data = generateData(10); // 定義線程池 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); // 將數(shù)據(jù)分成多個(gè)子任務(wù)進(jìn)行處理 List<CompletableFuture<Void>> futures = new ArrayList<>(); int batchSize = 3; for (int i = 0; i < data.size(); i += batchSize) { List<Integer> batch = data.subList(i, Math.min(i + batchSize, data.size())); CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { // 在這里執(zhí)行批處理的具體邏輯 processBatch(batch); }, executor); futures.add(future); } // 等待所有子任務(wù)完成 CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); // 在所有子任務(wù)完成后關(guān)閉線程池 allOf.thenRun(executor::shutdown); try { // 等待所有任務(wù)完成 allOf.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } private static List<Integer> generateData(int size) { List<Integer> data = new ArrayList<>(); for (int i = 1; i <= size; i++) { data.add(i); } return data; } private static void processBatch(List<Integer> batch) { // 模擬批處理邏輯 for (Integer value : batch) { System.out.println(Thread.currentThread().getName() + " - Processing: " + value); } }}??以上我們模擬了一組數(shù)據(jù),然后將數(shù)據(jù)分成多個(gè)批次,每個(gè)批次使用 CompletableFuture 異步執(zhí)行。CompletableFuture.allOf 用于等待所有子任務(wù)完成。
在這個(gè)示例中,主要體現(xiàn)了以下異步的思想和操作:
CompletableFuture.runAsync(() -> { // 執(zhí)行異步任務(wù)的邏輯}, executor);CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));allOf.thenRun(executor::shutdown);??這些異步操作幫助提高程序的并發(fā)性和響應(yīng)性,特別在處理批量任務(wù)時(shí),可以更有效地利用系統(tǒng)資源。異步編程模型能夠允許程序在等待某些操作完成的同時(shí)繼續(xù)執(zhí)行其他操作,從而提高系統(tǒng)的效率。
??異步設(shè)計(jì)在處理并發(fā)和提高系統(tǒng)性能方面具有優(yōu)勢(shì),但也帶來(lái)了一些可能的問(wèn)題。以上提供的場(chǎng)景和方案僅供參考。使用過(guò)程中應(yīng)當(dāng)根據(jù)業(yè)務(wù)特征合理選擇具體方案。
本文鏈接:http://m.www897cc.com/showinfo-26-70427-0.html聊聊項(xiàng)目實(shí)戰(zhàn)中的異步設(shè)計(jì)
聲明:本網(wǎng)頁(yè)內(nèi)容旨在傳播知識(shí),若有侵權(quán)等問(wèn)題請(qǐng)及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。郵件:2376512515@qq.com