了解有缓冲 channel

已完成

正如你所了解的,默认情况下 channel 是无缓冲行为。 这意味着只有存在接收操作时,它们才接受发送操作。 否则,程序将永久被阻止等待。

有时需要在 goroutine 之间进行此类同步。 但是,有时你可能只需要实现并发,而不需要限制 goroutine 之间的通信方式。

有缓冲 channel 在不阻止程序的情况下发送和接收数据,因为有缓冲 channel 的行为类似于队列。 创建 channel 时,可以限制此队列的大小,如下所示:

ch := make(chan string, 10)

每次向 channel 发送数据时,都会将元素添加到队列中。 然后,接收操作将从队列中删除该元素。 当 channel 已满时,任何发送操作都将等待,直到有空间保存数据。 相反,如果 channel 是空的且存在读取操作,程序则会被阻止,直到有数据要读取。

下面是一个理解有缓冲 channel 的简单示例:

package main

import (
    "fmt"
)

func send(ch chan string, message string) {
    ch <- message
}

func main() {
    size := 4
    ch := make(chan string, size)
    send(ch, "one")
    send(ch, "two")
    send(ch, "three")
    send(ch, "four")
    fmt.Println("All data sent to the channel ...")

    for i := 0; i < size; i++ {
        fmt.Println(<-ch)
    }

    fmt.Println("Done!")
}

运行程序时,将看到以下输出:

All data sent to the channel ...
one
two
three
four
Done!

你可能会说我们在这里没有做任何不同的操作,你是对的。 但是让我们看看当你将 size 变量更改为一个更小的数字(你甚至可以尝试使用一个更大的数字)时会发生什么情况,如下所示:

size := 2

重新运行程序时,将看到以下错误:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.send(...)
        /Users/developer/go/src/concurrency/main.go:8
main.main()
        /Users/developer/go/src/concurrency/main.go:16 +0xf3
exit status 2

出现此错误是因为对 send 函数的调用是连续的。 你不是在创建新的 goroutine。 因此,没有任何要排队的操作。

channel 与 goroutine 有着紧密的联系。 如果没有另一个 goroutine 从 channel 接收数据,则整个程序可能会永久处于被阻止状态。 正如你所见,这种情况确实会发生。

现在让我们进行一些有趣的实践! 我们将为最后两次调用创建 goroutine (前两次调用正确适应缓冲区),并运行 for 循环四次。 代码如下:

func main() {
    size := 2
    ch := make(chan string, size)
    send(ch, "one")
    send(ch, "two")
    go send(ch, "three")
    go send(ch, "four")
    fmt.Println("All data sent to the channel ...")

    for i := 0; i < 4; i++ {
        fmt.Println(<-ch)
    }

    fmt.Println("Done!")
}

运行程序时,它按预期工作。 我们建议在使用 channel 时始终使用 goroutine。

让我们测试一下创建的缓冲通道的元素超出所需量的情况。 我们将使用之前用于检查 API 的示例,并创建大小为 10 的缓冲通道:

package main

import (
    "fmt"
    "net/http"
    "time"
)

func main() {
    start := time.Now()

    apis := []string{
        "https://management.azure.com",
        "https://dev.azure.com",
        "https://api.github.com",
        "https://outlook.office.com/",
        "https://api.somewhereintheinternet.com/",
        "https://graph.microsoft.com",
    }

    ch := make(chan string, 10)

    for _, api := range apis {
        go checkAPI(api, ch)
    }

    for i := 0; i < len(apis); i++ {
        fmt.Print(<-ch)
    }

    elapsed := time.Since(start)
    fmt.Printf("Done! It took %v seconds!\n", elapsed.Seconds())
}

func checkAPI(api string, ch chan string) {
    _, err := http.Get(api)
    if err != nil {
        ch <- fmt.Sprintf("ERROR: %s is down!\n", api)
        return
    }

    ch <- fmt.Sprintf("SUCCESS: %s is up and running!\n", api)
}

运行程序时,将看到与以前相同的输出。 可以更改 channel 的大小,用更小或更大的数字进行测试,程序仍能正常运行。

无缓冲 channel 与有缓冲 channel

现在,你可能想知道何时使用这两种类型。 这完全取决于你希望 goroutine 之间的通信如何进行。 无缓冲 channel 同步通信。 它们保证每次发送数据时,程序都会被阻止,直到有人从 channel 中读取数据。

相反,有缓冲 channel 将发送和接收操作解耦。 它们不会阻止程序,但你必须小心使用,因为可能最终会导致死锁(如前文所述)。 使用无缓冲 channel 时,可以控制可并发运行的 goroutine 的数量。 例如,你可能要对 API 进行调用,并且想要控制每秒执行的调用次数。 否则,你可能会被阻止。

Channel 方向

Go 中的通道具有另一个有趣的功能。 在使用通道作为函数的参数时,可以指定通道是要“发送”数据还是“接收”数据。 随着程序的增长,可能会使用大量的函数,这时候,最好记录每个 channel 的意图,以便正确使用它们。 或者,你要编写一个库,并希望将 channel 公开为只读,以保持数据一致性。

要定义 channel 的方向,可以使用与读取或接收数据时类似的方式进行定义。 但是你在函数参数中声明 channel 时执行此操作。 将通道类型定义为函数中的参数的语法如下所示:

chan<- int // it's a channel to only send data
<-chan int // it's a channel to only receive data

通过仅接收的 channel 发送数据时,在编译程序时会出现错误。

让我们使用以下程序作为两个函数的示例,一个函数用于读取数据,另一个函数用于发送数据:

package main

import "fmt"

func send(ch chan<- string, message string) {
    fmt.Printf("Sending: %#v\n", message)
    ch <- message
}

func read(ch <-chan string) {
    fmt.Printf("Receiving: %#v\n", <-ch)
}

func main() {
    ch := make(chan string, 1)
    send(ch, "Hello World!")
    read(ch)
}

运行程序时,将看到以下输出:

Sending: "Hello World!"
Receiving: "Hello World!"

程序阐明每个函数中每个 channel 的意图。 如果试图使用一个 channel 在一个仅用于接收数据的 channel 中发送数据,将会出现编译错误。 例如,尝试执行如下所示的操作:

func read(ch <-chan string) {
    fmt.Printf("Receiving: %#v\n", <-ch)
    ch <- "Bye!"
}

运行程序时,将看到以下错误:

# command-line-arguments
./main.go:12:5: invalid operation: ch <- "Bye!" (send to receive-only type <-chan string)

编译错误总比误用 channel 好。

多路复用

最后,让我们讨论如何使用 select 关键字与多个通道同时交互。 有时,在使用多个 channel 时,需要等待事件发生。 例如,当程序正在处理的数据中出现异常时,可以包含一些逻辑来取消操作。

select 语句的工作方式类似于 switch 语句,但它适用于 channel。 它会阻止程序的执行,直到它收到要处理的事件。 如果它收到多个事件,则会随机选择一个。

select 语句的一个重要方面是,它在处理事件后完成执行。 如果要等待更多事件发生,则可能需要使用循环。

让我们使用以下程序来看看 select 的运行情况:

package main

import (
    "fmt"
    "time"
)

func process(ch chan string) {
    time.Sleep(3 * time.Second)
    ch <- "Done processing!"
}

func replicate(ch chan string) {
    time.Sleep(1 * time.Second)
    ch <- "Done replicating!"
}

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    go process(ch1)
    go replicate(ch2)

    for i := 0; i < 2; i++ {
        select {
        case process := <-ch1:
            fmt.Println(process)
        case replicate := <-ch2:
            fmt.Println(replicate)
        }
    }
}

运行程序时,将看到以下输出:

Done replicating!
Done processing!

请注意,replicate 函数首先完成,这就是首先在终端中看到其输出的原因。 main 函数存在一个循环,因为 select 语句在收到事件后立即结束,但我们仍在等待 process 函数完成。