作者 | zvalhu
Golang基于多線程、協(xié)程實現(xiàn),與生俱來適合異步編程,當我們遇到那種需要批量處理且耗時的操作時,傳統(tǒng)的線性執(zhí)行就顯得吃力,這時就會想到異步并行處理。下面介紹一些異步編程方式和技巧。

func main() { go func() { fmt.Println("hello world1") }() go func() { fmt.Println("hello world2") }()}或者:
func main() { go Announce("hello world1") go Announce("hello world2")}func Announce(message string) { fmt.Println(message)}使用匿名函數(shù)傳遞參數(shù)
data := "Hello, World!"go func(msg string) { // 使用msg進行異步任務邏輯處理 fmt.Println(msg)}(data)這種方式不需要考慮返回值問題,如果要考慮返回值,可以使用下面的方式。
ch := make(chan int, 1) // 創(chuàng)建一個帶緩沖的channel// ch := make(chan int, 0) // 創(chuàng)建一個無緩沖的channelgo func() { // 異步任務邏輯 ch <- result // 將結(jié)果發(fā)送到channel // 異步任務邏輯 close(ch) // 關(guān)閉channel,表示任務完成}()// 在需要的時候從channel接收結(jié)果result := <-chsync.WaitGroup用于等待一組協(xié)程完成其任務。通過Add()方法增加等待的協(xié)程數(shù)量,Done()方法標記協(xié)程完成,Wait()方法阻塞直到所有協(xié)程完成。
var wg sync.WaitGroup// 啟動多個協(xié)程for i := 0; i < 5; i++ { wg.Add(1) go func(index int) { defer wg.Done() // 異步任務邏輯 }(i)}// 等待所有協(xié)程完成wg.Wait()如果想簡單獲取協(xié)程返回的錯誤,errgroup包很適合,errgroup包是Go語言標準庫中的一個實用工具,用于管理一組協(xié)程并處理它們的錯誤。可以使用errgroup.Group結(jié)構(gòu)來跟蹤和處理協(xié)程組的錯誤。
var eg errgroup.Groupfor i := 0; i < 5; i++ { eg.Go(func() error { return errors.New("error") }) eg.Go(func() error { return nil })}if err := eg.Wait(); err != nil { // 處理錯誤}range操作可以在接收通道上迭代值,直到通道關(guān)閉。可以使用close函數(shù)關(guān)閉通道,以向接收方指示沒有更多的值。
ch := make(chan int)go func() { for i := 0; i < 5; i++ { ch <- i // 發(fā)送值到通道 } close(ch) // 關(guān)閉通道}()// 使用range迭代接收通道的值for val := range ch { // 處理接收到的值}ch1 := make(chan int)ch2 := make(chan string)go func() { // 異步任務1邏輯 ch1 <- result1}()go func() { // 異步任務2邏輯 ch2 <- result2}()// 在主goroutine中等待多個異步任務完成select {case res1 := <-ch1: // 處理結(jié)果1case res2 := <-ch2: // 處理結(jié)果2}如果需要在異步操作中設置超時,可以使用select語句結(jié)合time.After()函數(shù)實現(xiàn)。
ch := make(chan int)go func() { // 異步任務邏輯 time.Sleep(2 * time.Second) ch <- result}()// 設置超時時間select {case res := <-ch: // 處理結(jié)果case <-time.After(3 * time.Second): // 超時處理}如果需要在異步操作中設置超時,可以使用select語句結(jié)合time.After()函數(shù)實現(xiàn)。
ch := make(chan int)go func() { // 異步任務邏輯 time.Sleep(2 * time.Second) ch <- result}()// 設置超時時間select {case res := <-ch: // 處理結(jié)果case <-time.After(3 * time.Second): // 超時處理}time.Tick()函數(shù)返回一個通道,定期發(fā)送時間值,可以用于執(zhí)行定時操作。time.After()函數(shù)返回一個通道,在指定的時間后發(fā)送一個時間值。
tick := time.Tick(1 * time.Second) // 每秒執(zhí)行一次操作for { select { case <-tick: // 執(zhí)行定時操作 }}select {case <-time.After(5 * time.Second): // 在5秒后執(zhí)行操作}當多個協(xié)程并發(fā)訪問共享數(shù)據(jù)時,需要確保數(shù)據(jù)訪問的安全性。sync.Mutex和sync.RWMutex提供了互斥鎖和讀寫鎖,用于在訪問共享資源之前進行鎖定,以避免數(shù)據(jù)競爭。sync.RWMutex是一種讀寫鎖,可以在多個協(xié)程之間提供對共享資源的并發(fā)訪問控制。多個協(xié)程可以同時獲取讀鎖,但只有一個協(xié)程可以獲取寫鎖。
var mutex sync.Mutexvar data int// 寫操作,使用互斥鎖保護數(shù)據(jù)mutex.Lock()data = 123mutex.Unlock()// 讀操作,使用讀鎖保護數(shù)據(jù)//RLock()加讀鎖時,如果存在寫鎖,則無法加讀鎖;當只有讀鎖或者沒有鎖時,可以加讀鎖,讀鎖可以加載多個mutex.RLock()value := datamutex.RUnlock()var rwMutex sync.RWMutexvar sharedData map[string]string// 讀操作,使用rwMutex.RLock讀鎖保護數(shù)據(jù)func readData(key string) string { rwMutex.RLock() defer rwMutex.RUnlock() return sharedData[key]}// 寫操作,使用rwMutex.Lock寫鎖保護數(shù)據(jù)func writeData(key, value string) { rwMutex.Lock() defer rwMutex.Unlock() sharedData[key] = value}注意:sync.Mutex 的鎖是不可以嵌套使用的 sync.RWMutex 的 RLock()是可以嵌套使用的 sync.RWMutex 的 mu.Lock() 是不可以嵌套的 sync.RWMutex 的 mu.Lock() 中不可以嵌套 mu.RLock()
sync.Cond是一個條件變量,用于在協(xié)程之間進行通信和同步。它可以在指定的條件滿足之前阻塞等待,并在條件滿足時喚醒等待的協(xié)程。
var cond = sync.NewCond(&sync.Mutex{})var ready boolgo func() { // 異步任務邏輯 ready = true // 通知等待的協(xié)程條件已滿足 cond.Broadcast()}()// 在某個地方等待條件滿足cond.L.Lock()for !ready { cond.Wait()}cond.L.Unlock()sync.Pool是一個對象池,用于緩存和復用臨時對象,可以提高對象的分配和回收效率。
type MyObject struct { // 對象結(jié)構(gòu)}var objectPool = sync.Pool{ New: func() interface{} { // 創(chuàng)建新對象 return &MyObject{} },}// 從對象池獲取對象obj := objectPool.Get().(*MyObject)// 使用對象// 將對象放回對象池objectPool.Put(obj)sync.Once用于確保某個操作只執(zhí)行一次,無論有多少個協(xié)程嘗試執(zhí)行它,常用于初始化或加載資源等場景。
var once sync.Oncevar resource *Resourcefunc getResource() *Resource { once.Do(func() { // 執(zhí)行初始化資源的操作,僅執(zhí)行一次 resource = initResource() }) return resource}// 在多個協(xié)程中獲取資源go func() { res := getResource() // 使用資源}()go func() { res := getResource() // 使用資源}()可以結(jié)合使用sync.Once和context.Context來確保在多個協(xié)程之間只執(zhí)行一次資源清理操作,并在取消或超時時進行清理。
var once sync.Oncefunc cleanup() { // 執(zhí)行資源清理操作}func doTask(ctx context.Context) { go func() { select { case <-ctx.Done(): once.Do(cleanup) // 只執(zhí)行一次資源清理 } }() // 異步任務邏輯}sync.Map是Go語言標準庫中提供的并發(fā)安全的映射類型,可在多個協(xié)程之間安全地進行讀寫操作。
var m sync.Map// 存儲鍵值對m.Store("key", "value")// 獲取值if val, ok := m.Load("key"); ok { // 使用值}// 刪除鍵m.Delete("key")context.Context用于在協(xié)程之間傳遞上下文信息,并可用于取消或超時控制。可以使用context.WithCancel()創(chuàng)建一個可取消的上下文,并使用context.WithTimeout()創(chuàng)建一個帶有超時的上下文。
ctx, cancel := context.WithCancel(context.Background())go func() { // 異步任務邏輯 if someCondition { cancel() // 取消任務 }}()// 等待任務完成或取消select {case <-ctx.Done(): // 任務被取消或超時}context.WithDeadline()和context.WithTimeout()函數(shù)可以用于創(chuàng)建帶有截止時間的上下文,以限制異步任務的執(zhí)行時間。
func doTask(ctx context.Context) { // 異步任務邏輯 select { case <-time.After(5 * time.Second): // 超時處理 case <-ctx.Done(): // 上下文取消處理 }}func main() { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() go doTask(ctx) // 繼續(xù)其他操作}context.WithValue()函數(shù)可用于在上下文中傳遞鍵值對,以在協(xié)程之間共享和傳遞上下文相關(guān)的值。
type keyContextValue stringfunc doTask(ctx context.Context) { if val := ctx.Value(keyContextValue("key")); val != nil { // 使用上下文值 }}func main() { ctx := context.WithValue(context.Background(), keyContextValue("key"), "value") go doTask(ctx) // 繼續(xù)其他操作}atomic包提供了一組函數(shù),用于實現(xiàn)原子操作,以確保在并發(fā)環(huán)境中對共享變量的讀寫操作是原子的。
var counter int64func increment() { atomic.AddInt64(&counter, 1)}func main() { var wg sync.WaitGroup for i := 0; i < 100; i++ { wg.Add(1) go func() { defer wg.Done() increment() }() } wg.Wait() fmt.Println("Counter:", counter)}
本文鏈接:http://m.www897cc.com/showinfo-26-85231-0.htmlGolang異步編程方式和技巧
聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。郵件:2376512515@qq.com