Синхронізація в Go: горутини, тести, варіанти
Привіт, мене звати Ярослав, займаюсь розробкою в компанії Evrius. Ця стаття про синхронізацію результатів від паралельно виконаних підзадач, призначена для спеціалістів-початківців та тих, хто планує перейти на Go.
На початку 2019 року, маючи досвід з Go, шукав нову роботу. Під час більшості співбесід ставили запитання, як розпаралелити виконання завдання. Приблизний опис завдання: є список посилань, треба за ними перейти, отримати результат та синхронізувати. Вирішення було достатньо для проходження технічної частини в пару аутсорс-компаній.
Вартість горутини
Кожен розробник, який використовує Go, знає, що горутини дешеві. Трохи менше знають, що розмір мінімального стека горутини змінювали в ранніх версіях Go, у версії 1.13.
_StackMin = 2048
А щоб перевірити горутини на швидкодію, напишемо тест, у якому запустимо N горутин з простим завданням, дочекаємося завершення й подивимося результати:
package benchmarks import ( "sync" "sync/atomic" "testing" ) func BenchmarkGoroutineCost(b *testing.B) { var value uint32 var wg sync.WaitGroup wg.Add(b.N) for i := 0; i < b.N; i++ { go func() { atomic.AddUint32(&value, 1) wg.Done() }() } wg.Wait() if value != uint32(b.N) { b.Errorf("expected %d, got %d", b.N, value) } }
go version
go version go1.13.3 linux/amd64
go test./benchmarks/... -v -bench=BenchmarkGoroutineCost -benchmem
BenchmarkGoroutineCost-4 3437931 351 ns/op 0 B/op 0 allocs/op
PASS
ok gitlab.com/go-yp/go-sync/benchmarks 1.563s
~350 наносекунд на створення, виконання, завершення горутини.
Спершу я в це повірив, але потім вирішив написати ще один тест, щоб перевірити відсутність оптимізації такої простої операції, а також те, що чисельність горутин під час виконання збільшується.
package benchmarks import ( "runtime" "sync" "sync/atomic" "testing" ) type goRuntimeMaxCount struct { mu sync.Mutex value int } func (c *goRuntimeMaxCount) update() { var value = runtime.NumGoroutine() c.mu.Lock() if value > c.value { c.value = value } c.mu.Unlock() } func (c *goRuntimeMaxCount) get() int { return c.value } func BenchmarkGoroutineCostDump(b *testing.B) { var ( value = uint32(0) wg = new(sync.WaitGroup) goRuntimeCount = new(goRuntimeMaxCount) ) b.Logf("before goroutine count %d", goRuntimeCount.get()) wg.Add(b.N) for i := 0; i < b.N; i++ { go func() { atomic.AddUint32(&value, 1) goRuntimeCount.update() wg.Done() }() } wg.Wait() if value != uint32(b.N) { b.Errorf("expected %d, got %d", b.N, value) } b.Logf("after goroutine count %d for b.N = %d", goRuntimeCount.get(), b.N) }
go test ./benchmarks/... -v -bench=BenchmarkGoroutineCostDump -benchmem
BenchmarkGoroutineCostDump-4 3223651 366 ns/op 0 B/op 0 allocs/op
before goroutine count 0
after goroutine count 904 for b.N = 1000000
before goroutine count 0
after goroutine count 3160 for b.N = 3223651
PASS
ok gitlab.com/go-yp/go-sync/benchmarks 1.562s
Тест показав, що горутини справді створюються. А тепер змінимо тест так, щоб на кожній ітерації циклу горутина створювалася і завершувалася:
func BenchmarkGoroutineCostOne(b *testing.B) { var value uint32 var wg sync.WaitGroup for i := 0; i < b.N; i++ { wg.Add(1) go func() { atomic.AddUint32(&value, 1) wg.Done() }() wg.Wait() } if value != uint32(b.N) { b.Errorf("expected %d, got %d", b.N, value) } } func BenchmarkGoroutineCostOneOverhead(b *testing.B) { var value uint32 var wg sync.WaitGroup for i := 0; i < b.N; i++ { wg.Add(1) atomic.AddUint32(&value, 1) wg.Done() wg.Wait() } if value != uint32(b.N) { b.Errorf("expected %d, got %d", b.N, value) } }
go test ./benchmarks/... -v -bench=BenchmarkGoroutineCostOne -benchmem
BenchmarkGoroutineCostOne-4 1488328 841 ns/op 0 B/op 0 allocs/op
BenchmarkGoroutineCostOneOverhead-4 32435746 34.6 ns/op 0 B/op 0 allocs/op
PASS
ok gitlab.com/go-yp/go-sync/benchmarks 3.229s
Тести проводив на домашньому Intel® Core™ i5-4210U CPU @ 1.70GHz, що показали вартість горутини ~800 наносекунд.
Для порівняння: знаходження максимального елементу в масиві з 1024 елементів ~1400 наносекунд.
Атомарність
Ми вже використовували пакет atomic, потрібний для паралельних операцій, щоб гарантувати їхню успішність.
Якщо з попередніх тестів забрати atomic і використовувати просте додавання:
func TestParallelPureIncrement(t *testing.T) { const n = 1000000 var ( value uint32 wg = new(sync.WaitGroup) ) wg.Add(n) for i := 0; i < n; i++ { go func() { value++ // same sa "value = value + 1" wg.Done() }() } wg.Wait() if value != n { t.Errorf("expected %d, got %d", n, value) } }
Отримаємо:
=== RUN TestParallelPureIncrement
--- FAIL: TestParallelPureIncrement (0.35s)
expected 1000000, got 851804
FAIL
FAIL gitlab.com/go-yp/go-sync/benchmarks 0.353s
Очікували 1 000 000, отримали 851 804, через відсутність синхронізації між горутинами.
На основі пакета atomic ґрунтуються інші структури в Go, що використовують для синхронізації. Так, у тесті замість WaitGroup.Wait, ми можемо використати циклічну перевірку:
func TestParallelPureAtomic(t *testing.T) { const n = 1000000 var value uint32 for i := 0; i < n; i++ { go func() { atomic.AddUint32(&value, 1) }() } for atomic.LoadUint32(&value) < n { // NOP } }
=== RUN TestParallelPureAtomic
--- PASS: TestParallelPureAtomic (0.42s)
PASS
ok gitlab.com/go-yp/go-sync/benchmarks 0.425s
і додати перемикання на інші горутини runtime.Gosched()
for atomic.LoadUint32(&value) < n { runtime.Gosched() }
Проте ліпше використовуйте sync.WaitGroup.
Синхронізація результатів від горутин і перегони даних (data race)
Якщо ви початківець у Go, а вам треба розпаралелити завдання, то ліпше напишіть тест для цього завдання, розпаралельте й запустіть з флагом -race, щоб перевірити наявність data race.
Data race — помилка проектування, стан програми, коли один або більше потоків змінюють дані без блокування й один або більше читають ці дані без блокування, у результаті програма працює інакше, ніж очікуємо.
Знайдемо функціонал, що розробили в однопотоковому варіанті й зі збільшенням завдань — розпаралелили.
Наприклад, візьмемо сторінку bestofjs.org/projects, де навпроти кожного проекту бачимо число зірок на GitHub-і.
Раз на день ці числа треба оновлювати, а в базі всього 20 репозиторіїв.
Є функція, щоб отримати число зірок за ID репозиторію:
func fetchRepositoryStarByID(id int) int { // emulate slow http request by sleep time.Sleep(100 * time.Millisecond) // emulate response var stars = id % 32 return stars }
і програма, яку запускають раз на день, щоб оновити число зірочок для кожного репозиторію:
package main import ( "fmt" "time" ) const repositoryCount = 20 type repository struct { id int starCount int } func main() { var startTime = time.Now() var ids = getRepositoryIDs() var repositories = fetchRepositoryStarsByIDs(ids) updateRepositoryStars(repositories) var duration = time.Since(startTime) fmt.Printf("fetch %d from %d repositories by %d \n", len(repositories), repositoryCount, duration) } func getRepositoryIDs() []int { return make([]int, repositoryCount) } func fetchRepositoryStarsByIDs(ids []int) []repository { var result = make([]repository, 0, len(ids)) for _, id := range ids { result = append(result, repository{ id: id, starCount: fetchRepositoryStarByID(id), }) } return result } func fetchRepositoryStarByID(id int) int { // emulate slow http request by sleep time.Sleep(100 * time.Millisecond) // emulate response var stars = id % 32 return stars } func updateRepositoryStars(repositories []repository) { // NOP }
Програма виконується за 2 секунди.
Опублікували React, Vue, Svelte й розширення до них, тепер у базі 100 репозиторіїв і програма виконується за 10 секунд.
З цим треба щось робити, бо чекаємо, що в проекті буде ще більше репозиторіїв, а отже, оновлення займатиме ще більше часу.
Вирішили розпаралелити з використанням горутин і WaitGroup.
Тепер fetchRepositoryStarsByIDs має такий вигляд:
package main import ( "fmt" "sync" "time" ) const repositoryCount = 100 // ... same func fetchRepositoryStarsByIDs(ids []int) []repository { var ( length = len(ids) result = make([]repository, 0, length) wg = new(sync.WaitGroup) ) wg.Add(length) for _, id := range ids { go func() { result = append(result, repository{ id: id, starCount: fetchRepositoryStarByID(id), }) wg.Done() }() } wg.Wait() return result }
виконується за 100 мілісекунд, але в консолі бачимо, що тільки частину репозиторіїв оновлено:
fetch 81 from 100 repositories by 112999451
Запустимо з флагом -race
go run -race main.go
й отримаємо повідомлення, у яких рядках коду є помилки (вивів тільки частину повідомлення):
WARNING: DATA RACE
Read at 0×00c00009a020 by goroutine 8:
main.fetchRepositoryStarsByIDs.func1()
gitlab.com/go-yp/go-sync/main.go:41 +0×91
У цьому коді відразу дві помилки з data race.
Перша помилка data race: змінна id змінюється в основній горутині, де виконується цикл for і читається зі створених горутин.
Ось приклад, що покаже цю помилку:
package main import ( "fmt" "sync" "time" ) func main() { var ids = []int{1, 2, 3} var wg = new(sync.WaitGroup) wg.Add(3) for _, id := range ids { go func() { time.Sleep(time.Millisecond) fmt.Printf("id is %d\n", id) wg.Done() }() } wg.Wait() }
id is 3
id is 3
id is 3
Варіанти розв’язання:
for _, id := range ids { go func(id int) { time.Sleep(time.Millisecond) fmt.Printf("id is %d\n", id) wg.Done() }(id) }
for _, id := range ids { id := id go func() { time.Sleep(time.Millisecond) fmt.Printf("id is %d\n", id) wg.Done() }() }
Друга помилка data race: це append, що змінює SliceHeader через append з багатьох горутин.
Є три відомі мені варіанти розв’язання проблеми data race під час збереження результатів від горутин.
У першому: ми ініціалізуємо slice, і кожна горутина пише у свій індекс:
func fetchRepositoryStarsByIDs(ids []int) []repository { var ( length = len(ids) result = make([]repository, length) wg = new(sync.WaitGroup) ) wg.Add(length) for i, id := range ids { go func(i, id int) { result[i] = repository{ id: id, starCount: fetchRepositoryStarByID(id), } wg.Done() }(i, id) } wg.Wait() return result }
Після запуску отримаємо очікуваний результат 100 зі 100 й без помилки data race.
fetch 100 from 100 repositories by 113026518
Якщо переглянути документацію — кожний елемент масиву як окрема змінна і в цьому прикладі з fetchRepositoryStarsByIDs кожна горутина працює зі своїм індексом (змінною), а отже, немає data race:
Structured variables of array, slice, and struct types have elements and fields that may be addressed individually. Each such element acts like a variable.
Це саме питання про запис у різні індекси є на stackoverflow.
Другий: обернути append в sync.Mutex:
package main import ( "fmt" "sync" "time" ) const repositoryCount = 100 // ... same func fetchRepositoryStarsByIDs(ids []int) []repository { var ( length = len(ids) result = make([]repository, 0, length) wg = new(sync.WaitGroup) mu = new(sync.Mutex) ) wg.Add(length) for _, id := range ids { go func(id int) { var starCount = fetchRepositoryStarByID(id) mu.Lock() result = append(result, repository{ id: id, starCount: starCount, }) mu.Unlock() wg.Done() }(id) } wg.Wait() return result }
Після запуску отримаємо очікуваний результат 100 зі 100 й без помилки data race (так само).
Жодної переваги перед першим варіантом.
Діє таке саме правило, що й у циклах:
- Якщо дію можна винести за цикл, так ліпше й зробити.
- Якщо дію можна винести за mutex, так ліпше й зробити.
Наприклад, написавши такий код:
go func(id int) { mu.Lock() var starCount = fetchRepositoryStarByID(id) result = append(result, repository{ id: id, starCount: starCount, }) mu.Unlock() wg.Done() }(id)
Програма буде заблокована під час виконання важкої операції fetchRepositoryStarByID і стане послідовною з часом виконання 10 секунд.
Третій: писати результати в канал, це стандартне рішення, бо канали створені для можливості писання й читання з багатьох горутин, без помилки data race:
func fetchRepositoryStarsByIDs(ids []int) []repository { var length = len(ids) // can also use channel with length, in this case result will be same // var resultChan = make(chan repository, length) var resultChan = make(chan repository) var wg = new(sync.WaitGroup) wg.Add(length) for _, id := range ids { go func(id int) { resultChan <- repository{ id: id, starCount: fetchRepositoryStarByID(id), } wg.Done() }(id) } go func() { wg.Wait() // close chan and break read loop close(resultChan) }() var repositories = make([]repository, 0, length) // read loop, while resultChan is open for result := range resultChan { repositories = append(repositories, result) } return repositories }
Перевага цього варіанта в тому, що замість збереження в slice ми можемо відразу починати опрацьовувати результати й навіть розпаралелити читання, якщо потрібно.
Є ще один варіант — комбінація першого й другого:
func fetchRepositoryStarsByIDs(ids []int) []repository { var ( length = len(ids) result = make([]repository, length) wg = new(sync.WaitGroup) index = int32(-1) ) wg.Add(length) for _, id := range ids { go func(id int) { newIndex := atomic.AddInt32(&index, 1) result[newIndex] = repository{ id: id, starCount: fetchRepositoryStarByID(id), } wg.Done() }(id) } wg.Wait() return result }
Throttling (rate limiting)
Проект із зірочками зростає і вже має 1000 репозиторіїв.
GitHub (або інший сервіс) починає повертати HTTP status 429 (too many requests) або HTTP Timeout замість зірок, коли є багато одночасних запитів.
У нашому прикладі ми додамо додатковий time.Sleep(time.Second) у fetchRepositoryStarByID, щоб емулювати затримку HTTP Timeout:
package main import ( "fmt" "runtime" "sync" "time" ) const repositoryCount = 1000 // ... same func fetchRepositoryStarByID(id int) int { // emulate timeout if runtime.NumGoroutine() > 250 { time.Sleep(time.Second) } // emulate slow http request by sleep time.Sleep(100 * time.Millisecond) // emulate response var stars = id % 32 return stars }
Запустимо:
go run main.go
fetch 1000 from 1000 repositories by 1232180912
Як бачимо, тепер код виконується за секунду.
Тепер обмежмо число одночасних запитів за раз до 200.
Розгляньмо теж 3 варіанти (якщо комбінувати з попередніми, то, звісно, буде більше).
Найпростіший варіант — через канали (можна також пошукати за словом semaphore):
package main import ( "fmt" "runtime" "sync" "time" ) const ( repositoryCount = 1000 requestLimit = 200 ) // ... same func fetchRepositoryStarsByIDs(ids []int) []repository { var ( length = len(ids) result = make([]repository, length) wg = new(sync.WaitGroup) // WE ADD THIS throttler = make(chan struct{}, requestLimit) ) wg.Add(length) for i, id := range ids { // WE ADD THIS throttler <- struct{}{} go func(i, id int) { result[i] = repository{ id: id, starCount: fetchRepositoryStarByID(id), } // WE ADD THIS <-throttler wg.Done() }(i, id) } wg.Wait() close(throttler) return result }
go run -race main.go
fetch 1000 from 1000 repositories by 535724238
За півсекунди.
Ми пишемо в канал 200 разів і запускаємо 200 горутин, запустити наступну зможемо тоді, коли хтось прочитає з каналу.
Варіант через запуск воркерів (workers).
Запускаємо N воркерів (де N — наша константа request Limit), що з одного каналу читають завдання, а в інший пишуть результат:
package main // ... same func workerFetchRepositoryStarByID(wg *sync.WaitGroup, requestIdChannel <-chan int, responseRepositoryChannel chan<- repository) { // read while requestIdChannel open for id := range requestIdChannel { var starCount = fetchRepositoryStarByID(id) responseRepositoryChannel <- repository{ id: id, starCount: starCount, } } wg.Done() } func fetchRepositoryStarsByIDs(ids []int) []repository { var ( length = len(ids) result = make([]repository, 0, length) workerCount = requestLimit ) if workerCount > length { workerCount = length } var ( requestIdChannel = make(chan int, workerCount) responseRepositoryChannel = make(chan repository, workerCount) workerComplete = new(sync.WaitGroup) readComplete = new(sync.WaitGroup) ) workerComplete.Add(workerCount) for i := 0; i < workerCount; i++ { go workerFetchRepositoryStarByID(workerComplete, requestIdChannel, responseRepositoryChannel) } readComplete.Add(1) go func() { for responseRepository := range responseRepositoryChannel { result = append(result, responseRepository) } readComplete.Done() }() for _, id := range ids { requestIdChannel <- id } close(requestIdChannel) workerComplete.Wait() close(responseRepositoryChannel) readComplete.Wait() return result }
go run -race main.go
fetch 1000 from 1000 repositories by 540738968
Так багато коду для синхронізації, бо ми визначили розміри каналів, що менше загальної чисельності репозиторіїв.
Цей код працюватиме навіть з каналами без буфера:
var ( requestIdChannel = make(chan int, 0) responseRepositoryChannel = make(chan repository, 0) )
Якщо ж використовувати ресурси пам’яті сповна, то код матиме простіший вигляд:
func fetchRepositoryStarsByIDs(ids []int) []repository { var ( length = len(ids) result = make([]repository, 0, length) workerCount = requestLimit ) if workerCount > length { workerCount = length } var ( requestIdChannel = make(chan int, length) responseRepositoryChannel = make(chan repository, length) workerComplete = new(sync.WaitGroup) ) workerComplete.Add(workerCount) for i := 0; i < workerCount; i++ { go workerFetchRepositoryStarByID(workerComplete, requestIdChannel, responseRepositoryChannel) } for _, id := range ids { requestIdChannel <- id } close(requestIdChannel) workerComplete.Wait() close(responseRepositoryChannel) for responseRepository := range responseRepositoryChannel { result = append(result, responseRepository) } return result }
і якщо потім захочемо змінити:
var ( requestIdChannel = make(chan int, 0) responseRepositoryChannel = make(chan repository, 0) )
то заблокуємо виконання назавжди, коли воркери почнуть писати в responseRepositoryChannel, а читання вже після завершення.
Останній варіант — розділити початковий slice на пачки й для кожної розпаралелити виконання.
Ми використаємо зовнішній пакет, що поділить на діапазони:
package main import ( "fmt" "github.com/gopereza/packer" "runtime" "sync" "time" ) const ( repositoryCount = 1000 requestLimit = 200 ) // ... same func fetchRepositoryStarsByIDs(ids []int) []repository { var ( length = len(ids) result = make([]repository, length) wg = new(sync.WaitGroup) ) var packs = packer.Pack(length, requestLimit) for _, pack := range packs { for i := pack.From; i < pack.To; i++ { wg.Add(1) go func(i int) { var id = ids[i] result[i] = repository{ id: id, starCount: fetchRepositoryStarByID(id), } wg.Done() }(i) } wg.Wait() } return result }
Схожий приклад бачив у реальному проекті.
Недолік такого рішення — одна тривала операція затримає виконання всієї пачки, тому переписував на варіант з воркерами.
Епілог
Ми в проекті вибрали варіант з воркерами, що відправляють пачками.
Репозиторій, у якому тестував варіанти.
Далі буде.