我希望我早知道的 Golang 中的 3 个陷阱
在过去的一年里,我们一直在开发一个复杂的半实时生产系统。我们决定用 Golang 编写它。我们在 Go 方面几乎没有经验,所以你可能会想象这不是微不足道的。
快进一年:该系统正在生产中运行,并成为 ClimaCell 产品的主要支柱之一。
精通意味着您有足够的经验来了解您正在使用的平台的陷阱以及如何避免它们。
我想描述我们在使用 Golang 的过程中遇到的三个陷阱,希望它能帮助您避免它们。
范围可变性
考虑以下示例:
package main
import (
"fmt"
"sync"
)
type A struct {
id int
}
func main() {
channel := make(chan A, 5)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for a := range channel {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println(a.id)
}()
}
}()
for i := 0; i < 10; i++ {
channel <- A{id:i}
}
close(channel)
wg.Wait()
}
我们有一个保存结构实例的通道。我们用操作符遍历通道 range
。你认为这段代码的输出会是什么?
6
6
6
6
6
9
9
9
9
9
是不是很奇怪?我们希望看到数字 1-9(当然没有排序)。
我们实际看到的是循环变量可变性的结果:
在每次迭代中,我们都会得到一个可以使用的结构实例。结构是值类型——它们在每次迭代中被复制到 for 迭代变量中。这里的关键词是copy。为了避免大内存打印,不是在每次迭代中创建变量的新实例,而是在循环开始时创建一个实例,并在每次迭代中将数据复制到其上。
闭包是等式的另一部分:Go 中的闭包(像大多数语言一样),持有闭包中对象的引用(不复制数据),因此内部 go 例程获取迭代对象的引用,这意味着所有go 例程获得对同一实例的相同引用。
解决方案
首先要知道会发生这种情况。它不是微不足道的,因为它与其他语言的行为完全不同(for-each
在 C# 中,for-of
在 JS 中 – 在那些循环变量是不可变的)
为避免这种陷阱,请在循环范围内捕获变量,从而自己创建一个新实例,然后根据需要使用它:
go func() {
defer wg.Done()
for a := range channel {
wg.Add(1)
go func(item A) {
defer wg.Done()
fmt.Println(item.id)
}(a) // Capture happens here
}
}()
在这里,我们使用内部 go 例程的函数调用来捕获a
– 有效地复制它。它也可以显式复制:
for a := range channel {
wg.Add(1)
item := a // Capture happens here
go func() {
defer wg.Done()
fmt.Println(item.id)
}()
}
笔记
- 对于大型数据集,请注意捕获循环变量会创建大量对象,每个对象都会保存到底层的 go 例程将被执行,因此如果对象包含多个字段,请考虑仅捕获所需字段以执行内部例程
for-range
作为阵列的附加表现形式。它还创建一个索引循环变量。请注意,索引循环变量也是可变的。即在 go 例程中使用它,以与使用 value 循环变量相同的方式捕获它- 在当前的 Go 版本(1.15)上,我们看到的初始代码实际上会抛出错误!帮助我们避免这个问题并强制我们捕获我们需要的数据
当心:=
GoLang 有两个赋值运算符,=
和:=
:
var num int
num = 3
name := "yossi"
:=
非常有用,允许我们在赋值之前避免变量声明。它实际上是当今许多类型语言中的一种常见做法(例如var
在 C# 中)。它非常方便并且保持代码更干净(以我的拙见)。
但尽管如此可爱,当与 GoLang 中的其他一些行为、范围和多个返回值结合使用时,我们可能会遇到意想不到的行为。考虑以下示例:
package main
import (
"fmt"
)
func main() {
var data []string
data, err := getData()
if err != nil {
panic("ERROR!")
}
for _, item := range data {
fmt.Println(item)
}
}
func getData() ([]string, error) {
// Simulating getting the data from a datasource - lets say a DB.
return []string{"there","are","no","strings","on","me"}, nil
}
在这个例子中,我们从某个地方读取一个字符串数组并打印它:
there
are
no
strings
on
me
请注意以下用途:=
:
data, err := getData()
请注意,即使data
已经声明了我们仍然可以使用:=
因为err
is not – 很好的简写,它创建了一个更清晰的代码。
现在让我们稍微修改一下代码:
func main() {
var data []string
killswitch := os.Getenv("KILLSWITCH")
if killswitch == "" {
fmt.Println("kill switch is off")
data, err := getData()
if err != nil {
panic("ERROR!")
}
fmt.Printf("Data was fetched! %d\n", len(data))
}
for _, item := range data {
fmt.Println(item)
}
}
你认为这段代码的结果是什么?
kill switch is off
Data was fetched! 6
很奇怪,不是吗?由于关闭开关,我们确实加载了数据——我们甚至打印了它的长度。那么为什么代码不像以前那样打印出来呢?
你猜对了——因为:=
!
GoLang 中的范围(像大多数现代语言一样)是用{}
. 在这里,这if
创建了一个新范围:
if killswitch == "" {
...
}
因为我们使用:=
,Go 会将data
和都err
视为新变量!即data
在 if 子句中实际上是一个新变量,当作用域关闭时它被丢弃。
我们在初始化流程中多次遇到过此类行为 – 通常会公开某种包变量,完全按照此处所述进行初始化,并带有一个终止开关以允许我们禁用生产中的某些行为。上面的实现会导致系统的状态无效。
解决方案
意识——我已经说过了吗?:)
在某些情况下,如果不使用子句中的内部变量,Go 编译器会发出警告甚至错误,if
例如:
if killswitch == "" {
fmt.Println("kill switch is off")
data, err := getData()
if err != nil {
panic("ERROR!")
}
}
// Will issue an error :
data declared but not used
因此,请注意编译时的警告。
尽管如此,有时我们确实在范围内使用了变量,因此不会发出错误。
无论如何,最好的做法是尽量避免:=
速记——尤其是当它与多个返回值和错误处理有关时,在决定使用它时要格外注意:
func main() {
var data []string
var err error // Declaring err to make sure we can use = instead of :=
killswitch := os.Getenv("KILLSWITCH")
if killswitch == "" {
fmt.Println("kill switch is off")
data, err = getData()
if err != nil {
panic("ERROR!")
}
fmt.Printf("Data was fetched! %d\n", len(data))
}
for _, item := range data {
fmt.Println(item)
}
}
将导致:
kill switch is off
Data was fetched! 6
there
are
no
strings
on
me
请记住,随着代码的发展,不同的开发人员会对其进行修改。以前不在不同范围内的代码可能会在未来出现。修改现有代码时请留意,尤其是在将其移至不同范围时。
工人池。工人池船长
考虑以下示例:
package main
import (
"fmt"
"sync"
"time"
)
type A struct {
id int
}
func main() {
start := time.Now()
channel := make(chan A, 100)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for a := range channel {
process(a)
}
}()
for i := 0; i < 100; i++ {
channel <- A{id:i}
}
close(channel)
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("Took %s\n", elapsed)
}
func process(a A) {
fmt.Printf("Start processing %v\n", a)
time.Sleep(100 * time.Millisecond)
fmt.Printf("Finish processing %v\n", a)
}
和以前一样,我们for-range
在通道上有一个循环。假设该process
函数包含我们需要运行的算法,并且速度不是很快。如果我们处理 100,000 个项目,上面的代码将运行将近三个小时(process
在示例中运行 100 毫秒)。因此,让我们这样做:
package main
import (
"fmt"
"sync"
"time"
)
type A struct {
id int
}
func main() {
start := time.Now()
channel := make(chan A, 100)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for a := range channel {
wg.Add(1)
go func(a A) {
defer wg.Done()
process(a)
}(a)
}
}()
for i := 0; i < 100; i++ {
channel <- A{id:i}
}
close(channel)
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("Took %s\n", elapsed)
}
func process(a A) {
fmt.Printf("Start processing %v\n", a)
time.Sleep(100 * time.Millisecond)
fmt.Printf("Finish processing %v\n", a)
}
我们不是按顺序处理项目,而是为通道中的每个项目调度一个 Go 例程。我们想利用 Go 惊人的并发处理来帮助我们更快地处理数据:
项目 | 没有 GO 例程 | 使用 GO 例程 |
---|---|---|
100 | 10s | 100毫秒 |
从理论上讲,这也适用于 100K 项目,对吧?
不幸的是,答案是“视情况而定”。
要理解为什么,我们需要了解当我们调度一个 goroutine 时会发生什么。我不会深入探讨它,因为它超出了本文的范围。简而言之,运行时创建一个对象,其中包含与 go 例程相关的所有数据并存储它。当 go 例程的执行完成时,它会被驱逐。goroutine 对象的最小大小是 2K,但它可以达到 1GB(在 64 位机器上)。
到现在为止,你可能知道我们要去哪里了——我们创建的 goroutine 越多,我们创建的对象就越多,因此内存消耗也在增加。此外,go 例程需要 CPU 的执行时间来执行实际执行,因此我们拥有的内核越少,这些对象中的越多将保留在内存中等待执行。
在资源匮乏的环境(Lambda 函数、有限制的 K8s pod)上,CPU 和内存都是有限的,即使在 100K 的 go 例程中,代码示例也会对内存造成压力(同样,取决于实例可用的内存量) . 在我们的例子中,在具有 128MB 内存的 Cloud 函数上,我们能够在崩溃之前处理大约 100K 项。
请注意,从应用程序的角度来看,我们需要的实际数据非常小——在本例中,是一个简单的 int。大部分内存消耗是 goroutine 本身。
解决方案
工人池!
工作池允许我们管理我们拥有的 goroutine 的数量,从而保持低内存打印。让我们看看与工作池相同的示例:
package main
import (
"fmt"
"sync"
"time"
)
type A struct {
id int
}
func main() {
start := time.Now()
workerPoolSize := 100
channel := make(chan A, 100)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for i := 0;i < workerPoolSize;i++ {
wg.Add(1)
go func() {
defer wg.Done()
for a := range channel {
process(a)
}
}()
}
}()
// Feeding the channel
for i := 0; i < 100000; i++ {
channel <- A{id:i}
}
close(channel)
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("Took %s\n", elapsed)
}
func process(a A) {
fmt.Printf("Start processing %v\n", a)
time.Sleep(100 * time.Millisecond)
fmt.Printf("Finish processing %v\n", a)
}
我们将工作池的数量限制为 100,并为每个人创建了一个 goroutine:
go func() {
defer wg.Done()
for i := 0;i < workerPoolSize;i++ {
wg.Add(1)
go func() { // Go routine per worker
defer wg.Done()
for a := range channel {
process(a)
}
}()
}
}()
将通道想象成一个队列,其中每个工作人员执行例程都是队列的消费者。Go 的频道允许多个 go 例程监听同一个频道,其中频道中的每个项目将被处理一次。
好的一面
我们现在可以计划我们的环境,因为内存打印现在是预期的并且可以测量:
the size of the worker pool * expected size of a single go routine (min 2K)
不足之处
执行时间会增加。当我们限制内存使用时,我们会通过增加执行时间来支付它。为什么?之前我们为每个项目发送了一个 goroutine 来处理 – 有效地为每个项目创建消费者。实际上给了我们无限的规模和高并发。
实际上,它是不正确的,因为 go 例程的执行取决于运行应用程序的内核的可用性。这意味着我们必须根据我们运行的平台优化工作人员的数量,但在大容量系统中这样做是有意义的。
总结
工作池让我们可以更好地控制代码的执行。它们使我们具有可预测性,因此我们可以计划和优化我们的代码和平台,以扩展到高吞吐量和大量数据。
我建议在应用程序需要迭代数据集(即使是小数据集)时始终使用工作池。使用工作池,我们现在能够在云功能上处理数百万个项目,甚至没有接近平台启用的限制,为我们提供了足够的扩展空间。
笔记
- 工作人员的数量应该是可配置的(例如环境变量),以允许您使用数量并在您运行的每个平台上达到您想要的结果
- 将通道大小至少设置为池中的工作人员数量 – 这将允许数据生产者填充队列,并防止工作人员在生成数据时等待空闲。使其也可配置。
结论
使我们成为更好的专业人士的是从错误中学习的能力。但是向别人学习也同样重要。
如果你能走到这一步 – 谢谢!
我希望我们在这里看到的内容能帮助你,亲爱的读者,避免我们在使用 GoLang 的过程中所犯的错误。