日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當前位置:首頁 > 科技  > 軟件

異步Rust:構建實時消息代理服務器

來源: 責編: 時間:2024-02-01 12:44:22 278觀看
導讀在本文中,我們將深入研究使用Rust構建實時消息代理服務器,展示其強大的并發特性。我們將使用Warp作為web服務器,并使用Tokio來管理異步任務。此外,我們將創建一個WebSocket客戶端來測試代理服務器的功能。設計圖如下:圖片

在本文中,我們將深入研究使用Rust構建實時消息代理服務器,展示其強大的并發特性。我們將使用Warp作為web服務器,并使用Tokio來管理異步任務。此外,我們將創建一個WebSocket客戶端來測試代理服務器的功能。0Za28資訊網——每日最新資訊28at.com

設計圖如下:0Za28資訊網——每日最新資訊28at.com

圖片圖片0Za28資訊網——每日最新資訊28at.com

0Za28資訊網——每日最新資訊28at.com

構建消息代理服務器

消息代理服務器允許客戶端為主題生成事件并訂閱它們。它使用Warp作為HTTP和WebSocket服務器,使用Tokio作為異步運行時。0Za28資訊網——每日最新資訊28at.com

使用以下命令創建一個Rust項目:0Za28資訊網——每日最新資訊28at.com

cargo new real-ime-message

在Cargo.toml文件中加入以下依賴項:0Za28資訊網——每日最新資訊28at.com

[dependencies]futures-util = "0.3.30"tokio = {version = "1.35.1", features = ["full"]}tokio-tungstenite = "0.21.0"url = "2.5.0"warp = "0.3.6"

在src/main.rs文件中定義一個Broker結構體:0Za28資訊網——每日最新資訊28at.com

use std::{    collections::{HashMap, VecDeque},    sync::Arc,};use futures_util::{SinkExt, StreamExt};use tokio::sync::{    mpsc::{self, UnboundedSender},    RwLock,};use warp::{filters::ws::Message, Filter};type Topic = String;type Event = String;type WsSender = UnboundedSender<warp::ws::Message>;struct Broker {    events: Arc<RwLock<HashMap<Topic, VecDeque<Event>>>>,    subscribers: Arc<RwLock<HashMap<Topic, Vec<WsSender>>>>,}
  • events:存儲每個主題的事件。
  • subscribers:跟蹤每個主題的訂閱者。

創建一個新的Broker實例:0Za28資訊網——每日最新資訊28at.com

impl Broker {    fn new() -> Self {        Broker {            events: Arc::new(RwLock::new(HashMap::new())),            subscribers: Arc::new(RwLock::new(HashMap::new())),        }    }}

定義發布事件的方法produce:0Za28資訊網——每日最新資訊28at.com

impl Broker {    ......    async fn produce(&self, topic: Topic, event: Event) {        let mut events = self.events.write().await;        events            .entry(topic.clone())            .or_default()            .push_back(event.clone());        // 異步通知所有訂閱者        let subscribers_list;        {            let subscribers = self.subscribers.read().await;            subscribers_list = subscribers.get(&topic).cloned().unwrap_or_default();        }        for ws_sender in subscribers_list {            // 將事件發送到WebSocket客戶端            let _ = ws_sender.send(warp::ws::Message::text(event.clone()));        }    }}

0Za28資訊網——每日最新資訊28at.com

這個方法主要是將事件添加到相應的主題,然后將新事件通知所有訂閱者。0Za28資訊網——每日最新資訊28at.com

定義subscribe方法,來管理新的訂閱:0Za28資訊網——每日最新資訊28at.com

impl Broker {    ......    pub async fn subscribe(&self, topic: Topic, socket: warp::ws::WebSocket) {        let (ws_sender, mut ws_receiver) = socket.split();        let (tx, mut rx) = mpsc::unbounded_channel::<Message>();        {            let mut subs = self.subscribers.write().await;            subs.entry(topic).or_default().push(tx);        }        tokio::task::spawn(async move {            while let Some(result) = ws_receiver.next().await {                match result {                    Ok(message) => {                        // 處理有效的消息                        if message.is_text() {                            println!(                                "Received message from client: {}",                                message.to_str().unwrap()                            );                        }                    }                    Err(e) => {                        // 處理錯誤                        eprintln!("WebSocket error: {:?}", e);                        break;                    }                }            }            println!("WebSocket connection closed");        });        tokio::task::spawn(async move {            let mut sender = ws_sender;            while let Some(msg) = rx.recv().await {                let _ = sender.send(msg).await;            }        });    }}

0Za28資訊網——每日最新資訊28at.com

這個方法主要是將WebSocket拆分為發送方和接收方,將訂閱者添加到訂閱者列表中,處理傳入的WebSocket消息。0Za28資訊網——每日最新資訊28at.com

main函數代碼如下:0Za28資訊網——每日最新資訊28at.com

#[tokio::main]async fn main() {    let broker = Arc::new(Broker::new());    let broker_clone1 = Arc::clone(&broker);    let broker_clone2 = Arc::clone(&broker);    let produce = warp::path!("produce" / String)        .and(warp::post())        .and(warp::body::json())        .and(warp::any().map(move || Arc::clone(&broker_clone1)))        .and_then(            move |topic: String, event: Event, broker_clone2: Arc<Broker>| async move {                broker_clone2.produce(topic, event).await;                Ok::<_, warp::Rejection>(warp::reply())            },        );    let subscribe = warp::path!("subscribe" / String).and(warp::ws()).map(        move |topic: String, ws: warp::ws::Ws| {            let broker_clone3 = Arc::clone(&broker_clone2);            ws.on_upgrade(move |socket| async move {                broker_clone3.subscribe(topic.clone(), socket).await;            })        },    );    let routes = produce.or(subscribe);    println!("Broker server running at http://127.0.0.1:3030");    warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;}

0Za28資訊網——每日最新資訊28at.com

0Za28資訊網——每日最新資訊28at.com

實現WebSocket客戶端

WebSocket客戶端將模擬一個訂閱主題和接收消息的真實用戶。0Za28資訊網——每日最新資訊28at.com

在src/bin目錄下,創建一個ws_cli.rs文件。在文件中定義websocket_client函數,建立WebSocket連接并管理消息:0Za28資訊網——每日最新資訊28at.com

use futures_util::{sink::SinkExt, stream::StreamExt};use std::sync::Arc;use tokio::sync::RwLock;use tokio::time::{sleep, Duration};use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};use url::Url;async fn websocket_client(topic_url: &str) {    // 解析要連接WebSocket服務器的URL    let url = Url::parse(topic_url).expect("Invalid URL");    // 連接到WebSocket服務器    let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");    println!("WebSocket client connected");    let (mut write, mut read) = ws_stream.split();    let message = Arc::new(RwLock::new(String::new()));    let message_1 = message.clone();    // 生成一個任務來處理傳入的消息    tokio::spawn(async move {        let msg_lock = message_1.clone();        while let Some(message) = read.next().await {            match message {                Ok(msg) => {                    let mut ms = msg_lock.write().await;                    *ms = msg.to_text().unwrap().to_string();                    println!("Received message: {}", msg.to_text().unwrap());                }                Err(e) => {                    eprintln!("Error receiving message: {:?}", e);                    break;                }            }        }    });    // 發送消息    loop {        let msg_lock = message.clone();        let ms = msg_lock.read().await;        if let Err(e) = write.send(Message::Text(ms.to_string())).await {            eprintln!("Error sending message: {:?}", e);            break;        }        sleep(Duration::from_secs(5)).await;    }}

0Za28資訊網——每日最新資訊28at.com

main函數代碼如下:0Za28資訊網——每日最新資訊28at.com

#[tokio::main]async fn main() {    websocket_client("ws://127.0.0.1:3030/subscribe/newtopic").await;}

0Za28資訊網——每日最新資訊28at.com

0Za28資訊網——每日最新資訊28at.com

測試

執行如下命令運行消息代理服務器:0Za28資訊網——每日最新資訊28at.com

cargo run --bin real-ime-message

0Za28資訊網——每日最新資訊28at.com

執行結果:0Za28資訊網——每日最新資訊28at.com

Broker server running at http://127.0.0.1:3030

0Za28資訊網——每日最新資訊28at.com

0Za28資訊網——每日最新資訊28at.com

然后打開一個新的命令行,執行如下命令運行WebSocket客戶端:0Za28資訊網——每日最新資訊28at.com

cargo run --bin ws_cli

0Za28資訊網——每日最新資訊28at.com

執行結果:0Za28資訊網——每日最新資訊28at.com

WebSocket client connected

0Za28資訊網——每日最新資訊28at.com

0Za28資訊網——每日最新資訊28at.com

向http://127.0.0.1:3030/produce/newtopic接口發送post請求,如圖:0Za28資訊網——每日最新資訊28at.com

圖片圖片0Za28資訊網——每日最新資訊28at.com

0Za28資訊網——每日最新資訊28at.com

客戶端接收到消息:0Za28資訊網——每日最新資訊28at.com

WebSocket client connectedReceived message: This is a new event

0Za28資訊網——每日最新資訊28at.com

0Za28資訊網——每日最新資訊28at.com

0Za28資訊網——每日最新資訊28at.com

總結

我們已經探索了在Rust中創建一個簡單的消息代理,并使用WebSocket客戶端對其進行測試。這個例子突出了Rust在構建高效、并發的網絡應用程序方面的能力。0Za28資訊網——每日最新資訊28at.com

0Za28資訊網——每日最新資訊28at.com

本文鏈接:http://m.www897cc.com/showinfo-26-70392-0.html異步Rust:構建實時消息代理服務器

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: 分享 15 個 HTML 新特性,大多數人可能不知道,建議盡早使用上

下一篇: PHP 高性能的事件循環庫 Revolt

標簽:
  • 熱門焦點
Top 日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不
欧美一区二视频| 国产资源精品在线观看| 久久精品国产久精国产爱| 欧美一级成年大片在线观看| 新片速递亚洲合集欧美合集 | 亚洲欧美日韩国产综合精品二区| 久久亚洲国产精品日日av夜夜| 久久一日本道色综合久久| 欧美顶级大胆免费视频| 欧美日韩一视频区二区| 国产欧美高清| 黄色国产精品一区二区三区| 亚洲激情一区二区| 亚洲线精品一区二区三区八戒| 欧美一区中文字幕| 女女同性女同一区二区三区91| 欧美日韩 国产精品| 国产精品伊人日日| 亚洲国产精品久久精品怡红院| 日韩午夜三级在线| 午夜影视日本亚洲欧洲精品| 久久综合网hezyo| 欧美视频专区一二在线观看| 国产欧美日韩精品专区| 亚洲黄色天堂| 亚洲女与黑人做爰| 蜜乳av另类精品一区二区| 欧美三区不卡| 激情六月综合| 亚洲一区二区免费看| 久久久亚洲人| 国产精品高精视频免费| 在线精品国产成人综合| 亚洲一区免费观看| 久久综合亚洲社区| 国产精品久久久久久五月尺| 亚洲成色www8888| 亚洲高清网站| 性久久久久久久久| 欧美了一区在线观看| 国产一区二区中文| 一区二区三区精品视频在线观看| 久久久国产精品亚洲一区| 欧美日韩午夜剧场| 樱桃成人精品视频在线播放| 亚洲视频一区在线观看| 免费在线成人av| 国产日韩在线播放| 正在播放欧美视频| 欧美3dxxxxhd| 黑人一区二区三区四区五区| 亚洲一区二区三区欧美| 欧美电影免费网站| 国产综合色产| 亚洲综合首页| 欧美精品首页| 亚洲高清免费视频| 久久久精品免费视频| 国产精品一区二区欧美| 99国产精品99久久久久久粉嫩| 久热爱精品视频线路一| 国产日韩精品电影| 亚洲一区二区三区免费在线观看| 欧美国产日韩一二三区| 国外成人性视频| 亚洲欧美国产毛片在线| 欧美日韩免费一区二区三区视频| 亚洲激情欧美| 老司机一区二区三区| 国产视频观看一区| 亚洲欧美久久久| 欧美色网一区二区| 亚洲免费观看高清在线观看| 免费亚洲婷婷| 在线免费观看欧美| 久久免费一区| 黄色成人在线免费| 久久久久久999| 国产一区二区三区在线播放免费观看| 亚洲一区激情| 国产精品v日韩精品| 夜夜嗨av一区二区三区四季av| 欧美黄色影院| 亚洲日韩视频| 欧美精品亚洲| 亚洲精品亚洲人成人网| 欧美成人影音| 亚洲麻豆国产自偷在线| 欧美精品久久一区| 亚洲美女黄网| 欧美日韩亚洲一区二区三区四区| 99re6热只有精品免费观看| 欧美国产日韩一区| 亚洲毛片播放| 欧美三级在线| 亚洲一级在线| 国产精品视频yy9099| 午夜视频在线观看一区二区三区| 国产精品夜夜夜| 欧美一区影院| 在线观看不卡av| 欧美1区2区视频| 国产精品捆绑调教| 欧美一级午夜免费电影| 国产在线日韩| 欧美va天堂在线| 欧美精品免费观看二区| 一本综合精品| 国产精品私房写真福利视频| 久久福利电影| 亚洲国产成人精品久久| 欧美激情一区二区三区不卡| 这里只有精品视频| 国产日韩亚洲欧美| 蜜臀99久久精品久久久久久软件| 亚洲国产免费| 国产精品99免视看9| 日韩一区二区福利| 国产精品女人网站| 久久久噜噜噜久久中文字幕色伊伊 | 亚洲专区一区| 国产三级精品三级| 麻豆成人在线播放| 999在线观看精品免费不卡网站| 国产精品久久久久久福利一牛影视| 午夜精品成人在线| 狠狠色综合网| 暖暖成人免费视频| 亚洲天堂av图片| 国产精一区二区三区| 久久成人免费电影| 亚洲黑丝一区二区| 欧美日韩免费在线观看| 欧美亚洲色图校园春色| 精品91在线| 欧美日韩亚洲综合一区| 亚洲一区国产视频| 国内综合精品午夜久久资源| 欧美连裤袜在线视频| 亚洲视频每日更新| 国产视频一区三区| 欧美成人亚洲| 亚洲精品中文字幕有码专区| 国产精品国产精品国产专区不蜜| 欧美在线一区二区| 亚洲国产成人久久| 欧美日韩综合视频网址| 久久精品在线| 国内成+人亚洲+欧美+综合在线| 免费久久99精品国产| 99视频在线精品国自产拍免费观看| 国产精品美女久久| 久久蜜桃精品| 99精品国产在热久久婷婷| 国产精品一页| 美女久久网站| 亚洲一二三区在线| 亚洲黄色毛片| 国产精品网站在线| 模特精品裸拍一区| 亚洲免费网址| 国产一区二区无遮挡| 欧美日韩国产成人高清视频| 欧美一区二区视频网站| 亚洲九九爱视频| 国产综合欧美| 欧美视频亚洲视频| 欧美 日韩 国产 一区| 亚洲小说区图片区| 在线观看日韩精品| 国产精品美女一区二区在线观看| 免费日韩av| 欧美一级在线亚洲天堂| 日韩视频二区| 在线成人黄色| 国产精品国产三级国产普通话99| 男男成人高潮片免费网站| 午夜精品999| 亚洲精品你懂的| 狠狠色丁香婷婷综合久久片| 国产精品理论片在线观看| 女女同性精品视频| 欧美在线视频免费| 亚洲一区二区三区三| 99视频精品全部免费在线| 精品成人在线观看| 国产精品一区久久久| 欧美激情中文字幕一区二区| 久久精品九九| 久久黄金**| 午夜影院日韩| 亚洲香蕉网站| 日韩午夜激情av| 日韩亚洲精品视频| 亚洲国产欧美日韩精品| 国产一区二区欧美| 国产精品日韩电影| 国产精品久久一区二区三区| 欧美日韩国产精品自在自线| 免费成人在线观看视频| 久久久久久久久久久久久9999| 欧美一区国产在线| 亚洲——在线| 国产精品99久久久久久白浆小说|