Go 编程语言的最大优势之一是其内置的并发支持,基于 Tony Hoare 的“Communicating Sequential Processes”。作为一名具有 JS 和 Java 背景的开发人员,我对在 Go 中运行并发代码的轻松程度感到惊讶。
Go 和其他语言的并发区别
实际上,goroutines 不是线程。它们是绿色的线。让我们看看什么是绿线。
在 计算机编程中, 绿色线程 或 虚拟线程 是 由 运行时库 或 虚拟机(VM) 调度的线程, 而不是由底层 操作系统 (OS) 本机调度的线程。
–维基百科
绿色线程模拟多线程环境,不依赖任何本机操作系统功能,它们在 用户空间 而不是 内核 空间进行管理,使它们能够在不支持本机线程的环境中工作。Go 调度器负责在 goroutine 之间切换线程。因此,在绿色线程(在我们的例子中是 goroutines)之间切换上下文比 os 线程有效地便宜。goroutine 的初始堆栈大小是 2KB(并且会缩小),而不是OS 线程堆栈的约 8MB。
总结用户空间内部的go scheduler
工作go runtime
并使用操作系统线程。Goroutines 在操作系统线程的上下文中运行。
合作和先发制人
在 1.14 版本之前,Go 只有协作调度。这意味着 goroutine 自行决定何时出于任何原因释放资源(例如调用函数任何 IO 操作、等待互斥体、从通道读取等)。这可能会导致单个 goroutine 占用 CPU 的问题,并且不会达到上述任何原因。所以在 1.14 中引入了异步抢占。基于时间条件触发异步抢占。当一个 goroutine 运行超过 10 秒时,Go 调度程序将尝试抢占它。
让我们来看看它是如何工作的。首先,如何创建一个goroutine?
要创建一个 goroutine,我们需要使用如下关键字go
:
go func() {
//logic of concurrent function
}()
我不会深入研究指针在 Go 中的工作原理,但您可以在此处阅读并观看此内容。
我们的完整示例将如下所示:
package main
import (
"fmt"
"runtime"
)
func main() {
runtime.GOMAXPROCS(1)
i := 0
go func(i *int) {
for {
*i++
}
}(&i)
runtime.Gosched()
fmt.Println(i)
}
尝试使用低于 1.14 的版本运行它,然后检查上面的版本。使用低于该版本的程序将无休止地等待无限循环完成。下面的版本将从运行无限循环的 goroutine 中抢占资源并打印 i 的值。
频道
有时我们需要一种方式在 goroutine 之间进行通信。在 Go 中有一个特殊的口号:
不要通过共享内存进行通信;相反,通过通信共享内存。
这是什么意思?使用并发程序总是不容易,因为您应该始终牢记竞争条件、死锁和其他问题。Go 引入了处理这个问题的通道。Channel 是 goroutine 之间的一种通信方式。它有一个类型 (int, string, some struct) 并由关键字创建make
。
make(ch chan int)
要从通道写入或读取某些内容,有一种特殊的语法:
ch <- 2 // write
v := <- ch // read and assign result to variable v
通道可以缓冲也可以不缓冲。不同的是,当 goroutine 尝试写入具有空闲空间的缓冲通道时,goroutine 不会被阻塞,而是会继续执行。
您还可以遍历通道。
for v := range ch {
}
正如您可能假设的那样,如果通道中没有值,则执行将被阻塞,直到某个 goroutine 将值写入线程。
您也可以关闭通道,结果 for 循环将停止在关闭通道上的迭代。
close(ch)
网络爬虫
作为示例,让我们创建一个简单的函数来检查网站的状态
package main
import (
"fmt"
"net/http"
)
func main() {
websites := []string{
"https://code8cn.com/",
"https://github.com/",
"https://apple.com/",
"https://google.com/",
"https://youtube.com/",
"https://www.udemy.com/",
"https://netflix.com/",
"https://www.coursera.org/",
"https://facebook.com/",
"https://microsoft.com",
"https://wikipedia.org",
"https://educative.io",
"https://acloudguru.com",
}
for _, website := range websites {
checkResource(website)
}
}
func checkResource(website string) {
if res, err := http.Get(website); err != nil {
fmt.Println(website, "is down")
} else {
fmt.Printf("[%d] %s is up\n", res.StatusCode, website)
}
}
如果您将运行它,您将在控制台中看到这样的日志:
[200] https://code8cn.com/已上线 [200] https://github.com/已上线 [200] https://apple.com/已上线 [200] https://google.com/起来 [200]
https://www.youtube.com/embed/undefined已上涨 [200]
https://www.udemy.com/已上涨 [200]
https://netflix.com/已上涨 [200]
https://www.coursera.org/已上涨 [200]
https: //facebook.com/已上线 [200]
https://microsoft.com已上线 [200]
https://wikipedia.org已上线 [200]
https://education.io已上线 [200]
调用此代码大约需要 10 秒。问题当然是因为一个接一个地同时检查每个资源。现在让我们试着让它更快一点。为此,我们将使用工作池模式。您将使用一个 goroutine 池来管理正在执行的并发工作。使用 for 循环,您将创建一定数量的工作 goroutine 作为资源池。然后,在您的 main()“线程”中,您将使用一个通道来提供工作。
首先,我们需要为我们的案例定义一个工人。它看起来像:
func worker(resources, results chan string) {
for resource := range resources {
if res, err := http.Get(resource); err != nil {
results <- resource + " is down"
} else {
results <- fmt.Sprintf("[%d] %s is up", res.StatusCode, resource)
}
}
}
让我们快速找出这里到底发生了什么。每个工作人员都将等待来自频道的网站资源,resources
并且在有人将资源 URL 推送到频道工作人员之后,工作人员将收到此 URL 并检查它是否正常并将结果推送到另一个名为results
.
现在让我们看看我们将如何运行我们的工作池:
func main() {
websites := []string{
//...
}
resources := make(chan string, 6)
results := make(chan string)
for i := 0; i < 6; i++ {
go worker(resources, results)
}
}
我们的工作池包含 6 个 goroutine,它们正在运行并等待资源检查。在这里,我们可以将其用作单独的 goroutine IIF 或立即调用函数:
go func() {
for _, v := range websites {
resources <- v
}
}()
为什么我们不应该在这里使用同步内联代码?什么时候,你可能会尝试移除最后一个例子
go
,你会遇到一个deadlock。
现在我们不仅有我们的工人池,而且还为他们提供了工作:) 我们需要做的最后一件事是从池中读取结果。为此,我们可以遍历results
主 goroutine 中的通道并打印检查每个网站的所有结果:
for i := 0; i < len(websites); i++ {
fmt.Println(<-results)
}
完整的代码如下所示:
package main
import (
"fmt"
"net/http"
)
func main() {
websites := []string{
"https://code8cn.com/",
"https://github.com/",
"https://apple.com/",
"https://google.com/",
"https://youtube.com/",
"https://www.udemy.com/",
"https://netflix.com/",
"https://www.coursera.org/",
"https://facebook.com/",
"https://microsoft.com",
"https://wikipedia.org",
"https://educative.io",
"https://acloudguru.com",
}
resources := make(chan string, 6)
results := make(chan string)
for i := 0; i < 6; i++ {
go worker(resources, results)
}
go func() {
for _, v := range websites {
resources <- v
}
}()
for i := 0; i < len(websites); i++ {
fmt.Println(<-results)
}
}
func worker(resources, results chan string) {
for resource := range resources {
if res, err := http.Get(resource); err != nil {
results <- resource + " is down"
} else {
results <- fmt.Sprintf("[%d] %s is up", res.StatusCode, resource)
}
}
}
如果您运行它,您会发现它的调用速度比顺序版本快得多。您还可以使用 goroutine 的数量,看看它将如何影响执行速度。