跳转到内容

Go 并发模式

来自代码酷

Go并发模式[编辑 | 编辑源代码]

Go并发模式是Go语言中用于管理和协调多个并发任务(goroutine)的设计模式。由于Go语言内置了强大的并发原语(如goroutine和channel),开发者可以利用这些特性构建高效、可扩展的并发程序。本章将详细介绍常见的Go并发模式,并通过代码示例和实际案例帮助初学者和高级用户理解这些模式的应用。

介绍[编辑 | 编辑源代码]

Go语言的并发模型基于CSP(Communicating Sequential Processes)理论,强调通过通信来共享内存,而不是通过共享内存来通信。Go的并发模式主要包括以下几种:

1. Generator模式:生成数据流供其他goroutine使用。 2. Fan-in/Fan-out模式:合并或分发多个goroutine的任务。 3. Worker Pool模式:限制并发goroutine的数量,避免资源耗尽。 4. Pipeline模式:将任务分解为多个阶段,每个阶段由独立的goroutine处理。 5. Select模式:多路复用channel操作,处理多个并发事件。

以下将逐一介绍这些模式,并提供代码示例。

Generator模式[编辑 | 编辑源代码]

Generator模式用于生成一系列值,供其他goroutine消费。通常通过一个返回channel的函数实现。

package main

import "fmt"

// 生成整数序列的Generator
func generateNumbers(n int) <-chan int {
    out := make(chan int)
    go func() {
        for i := 0; i < n; i++ {
            out <- i
        }
        close(out)
    }()
    return out
}

func main() {
    for num := range generateNumbers(5) {
        fmt.Println(num)
    }
}

输出:

0
1
2
3
4

Fan-in/Fan-out模式[编辑 | 编辑源代码]

Fan-out指一个goroutine分发任务给多个worker goroutines,而Fan-in指多个goroutine的结果合并到一个channel中。

Fan-out示例[编辑 | 编辑源代码]

func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job)
        results <- job * 2
    }
}

func main() {
    jobs := make(chan int, 5)
    results := make(chan int, 5)

    // 启动3个worker
    for i := 1; i <= 3; i++ {
        go worker(i, jobs, results)
    }

    // 分发5个任务
    for j := 1; j <= 5; j++ {
        jobs <- j
    }
    close(jobs)

    // 收集结果
    for r := 1; r <= 5; r++ {
        fmt.Println("Result:", <-results)
    }
}

输出:

Worker 1 processing job 1
Worker 2 processing job 2
Worker 3 processing job 3
Worker 1 processing job 4
Worker 2 processing job 5
Result: 2
Result: 4
Result: 6
Result: 8
Result: 10

Fan-in示例[编辑 | 编辑源代码]

func merge(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 从每个channel中读取数据并合并到out
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            for n := range c {
                out <- n
            }
            wg.Done()
        }(ch)
    }

    // 关闭out channel
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

Worker Pool模式[编辑 | 编辑源代码]

Worker Pool模式通过固定数量的goroutine处理任务,避免无限制创建goroutine导致资源耗尽。

func workerPool(workerCount, jobCount int) {
    jobs := make(chan int, jobCount)
    results := make(chan int, jobCount)

    // 启动worker
    for i := 1; i <= workerCount; i++ {
        go func(id int) {
            for job := range jobs {
                results <- job * 2
            }
        }(i)
    }

    // 分发任务
    for j := 1; j <= jobCount; j++ {
        jobs <- j
    }
    close(jobs)

    // 收集结果
    for r := 1; r <= jobCount; r++ {
        fmt.Println("Result:", <-results)
    }
}

Pipeline模式[编辑 | 编辑源代码]

Pipeline模式将任务分解为多个阶段,每个阶段由独立的goroutine处理,并通过channel连接。

func stage1(nums []int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func stage2(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * 2
        }
        close(out)
    }()
    return out
}

func stage3(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n + 1
        }
        close(out)
    }()
    return out
}

func main() {
    nums := []int{1, 2, 3, 4, 5}
    for result := range stage3(stage2(stage1(nums))) {
        fmt.Println(result)
    }
}

输出:

3
5
7
9
11

Select模式[编辑 | 编辑源代码]

Select模式用于多路复用channel操作,处理多个并发事件。

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)

    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "Hello"
    }()

    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "World"
    }()

    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println(msg1)
        case msg2 := <-ch2:
            fmt.Println(msg2)
        }
    }
}

输出:

Hello
World

实际案例[编辑 | 编辑源代码]

Web爬虫并发控制[编辑 | 编辑源代码]

使用Worker Pool模式限制并发爬取的goroutine数量:

func crawl(url string, wg *sync.WaitGroup, workerID int) {
    defer wg.Done()
    fmt.Printf("Worker %d crawling %s\n", workerID, url)
    // 模拟网络请求
    time.Sleep(1 * time.Second)
    fmt.Printf("Worker %d finished %s\n", workerID, url)
}

func main() {
    urls := []string{
        "https://example.com/page1",
        "https://example.com/page2",
        "https://example.com/page3",
        "https://example.com/page4",
    }

    var wg sync.WaitGroup
    maxWorkers := 2
    semaphore := make(chan struct{}, maxWorkers)

    for i, url := range urls {
        wg.Add(1)
        semaphore <- struct{}{}
        go func(id int, u string) {
            crawl(u, &wg, id)
            <-semaphore
        }(i+1, url)
    }

    wg.Wait()
}

实时数据处理Pipeline[编辑 | 编辑源代码]

使用Pipeline模式处理实时数据流:

graph LR A[数据源] --> B[过滤阶段] B --> C[转换阶段] C --> D[聚合阶段] D --> E[输出结果]

总结[编辑 | 编辑源代码]

Go并发模式为开发者提供了强大的工具来构建高效、可维护的并发程序。通过合理使用这些模式,可以避免常见的并发问题(如竞态条件、死锁等),同时充分利用多核CPU的计算能力。关键点包括:

  • 使用channel进行goroutine间通信
  • 避免共享内存,优先使用通信
  • 合理控制并发量(Worker Pool)
  • 使用Pipeline分解复杂任务
  • 利用Select处理多路channel操作

掌握这些模式后,开发者可以轻松应对各种并发编程挑战。