Синхронізація в Go: горутини, тести, варіанти

Привіт, мене звати Ярослав, займаюсь розробкою в компанії Evrius. Ця стаття про синхронізацію результатів від паралельно виконаних підзадач, призначена для спеціалістів-початківців та тих, хто планує перейти на Go.

На початку 2019 року, маючи досвід з Go, шукав нову роботу. Під час більшості співбесід ставили запитання, як розпаралелити виконання завдання. Приблизний опис завдання: є список посилань, треба за ними перейти, отримати результат та синхронізувати. Вирішення було достатньо для проходження технічної частини в пару аутсорс-компаній.

Вартість горутини

Кожен розробник, який використовує Go, знає, що горутини дешеві. Трохи менше знають, що розмір мінімального стека горутини змінювали в ранніх версіях Go, у версії 1.13.

_StackMin = 2048

А щоб перевірити горутини на швидкодію, напишемо тест, у якому запустимо N горутин з простим завданням, дочекаємося завершення й подивимося результати:

package benchmarks

import (
	"sync"
	"sync/atomic"
	"testing"
)

func BenchmarkGoroutineCost(b *testing.B) {
	var value uint32
	var wg sync.WaitGroup

	wg.Add(b.N)

	for i := 0; i < b.N; i++ {
		go func() {
			atomic.AddUint32(&value, 1)

			wg.Done()
		}()
	}

	wg.Wait()

	if value != uint32(b.N) {
		b.Errorf("expected %d, got %d", b.N, value)
	}
}

go version

go version go1.13.3 linux/amd64

go test./benchmarks/... -v -bench=BenchmarkGoroutineCost -benchmem

BenchmarkGoroutineCost-4 3437931 351 ns/op 0 B/op 0 allocs/op
PASS
ok gitlab.com/go-yp/go-sync/benchmarks 1.563s

~350 наносекунд на створення, виконання, завершення горутини.

Спершу я в це повірив, але потім вирішив написати ще один тест, щоб перевірити відсутність оптимізації такої простої операції, а також те, що чисельність горутин під час виконання збільшується.

package benchmarks

import (
	"runtime"
	"sync"
	"sync/atomic"
	"testing"
)

type goRuntimeMaxCount struct {
	mu    sync.Mutex
	value int
}

func (c *goRuntimeMaxCount) update() {
	var value = runtime.NumGoroutine()

	c.mu.Lock()
	if value > c.value {
		c.value = value
	}
	c.mu.Unlock()
}

func (c *goRuntimeMaxCount) get() int {
	return c.value
}

func BenchmarkGoroutineCostDump(b *testing.B) {
	var (
		value          = uint32(0)
		wg             = new(sync.WaitGroup)
		goRuntimeCount = new(goRuntimeMaxCount)
	)

	b.Logf("before goroutine count %d", goRuntimeCount.get())

	wg.Add(b.N)

	for i := 0; i < b.N; i++ {
		go func() {
			atomic.AddUint32(&value, 1)

			goRuntimeCount.update()

			wg.Done()
		}()
	}

	wg.Wait()

	if value != uint32(b.N) {
		b.Errorf("expected %d, got %d", b.N, value)
	}

	b.Logf("after goroutine count %d for b.N = %d", goRuntimeCount.get(), b.N)
}

go test ./benchmarks/... -v -bench=BenchmarkGoroutineCostDump -benchmem

BenchmarkGoroutineCostDump-4 3223651 366 ns/op 0 B/op 0 allocs/op
before goroutine count 0
after goroutine count 904 for b.N = 1000000
before goroutine count 0
after goroutine count 3160 for b.N = 3223651
PASS
ok gitlab.com/go-yp/go-sync/benchmarks 1.562s

Тест показав, що горутини справді створюються. А тепер змінимо тест так, щоб на кожній ітерації циклу горутина створювалася і завершувалася:

func BenchmarkGoroutineCostOne(b *testing.B) {
	var value uint32
	var wg sync.WaitGroup

	for i := 0; i < b.N; i++ {
		wg.Add(1)
		go func() {
			atomic.AddUint32(&value, 1)

			wg.Done()
		}()
		wg.Wait()
	}

	if value != uint32(b.N) {
		b.Errorf("expected %d, got %d", b.N, value)
	}
}

func BenchmarkGoroutineCostOneOverhead(b *testing.B) {
	var value uint32
	var wg sync.WaitGroup

	for i := 0; i < b.N; i++ {
		wg.Add(1)
		atomic.AddUint32(&value, 1)
		wg.Done()
		wg.Wait()
	}

	if value != uint32(b.N) {
		b.Errorf("expected %d, got %d", b.N, value)
	}
}

go test ./benchmarks/... -v -bench=BenchmarkGoroutineCostOne -benchmem

BenchmarkGoroutineCostOne-4 1488328 841 ns/op 0 B/op 0 allocs/op
BenchmarkGoroutineCostOneOverhead-4 32435746 34.6 ns/op 0 B/op 0 allocs/op
PASS
ok gitlab.com/go-yp/go-sync/benchmarks 3.229s

Тести проводив на домашньому Intel® Core™ i5-4210U CPU @ 1.70GHz, що показали вартість горутини ~800 наносекунд.

Для порівняння: знаходження максимального елементу в масиві з 1024 елементів ~1400 наносекунд.

Атомарність

Ми вже використовували пакет atomic, потрібний для паралельних операцій, щоб гарантувати їхню успішність.

Якщо з попередніх тестів забрати atomic і використовувати просте додавання:

func TestParallelPureIncrement(t *testing.T) {
	const n = 1000000

	var (
		value uint32
		wg    = new(sync.WaitGroup)
	)

	wg.Add(n)
	for i := 0; i < n; i++ {
		go func() {
			value++ // same sa "value = value + 1"

			wg.Done()
		}()
	}

	wg.Wait()

	if value != n {
		t.Errorf("expected %d, got %d", n, value)
	}
}

Отримаємо:

=== RUN TestParallelPureIncrement
--- FAIL: TestParallelPureIncrement (0.35s)
expected 1000000, got 851804
FAIL
FAIL gitlab.com/go-yp/go-sync/benchmarks 0.353s

Очікували 1 000 000, отримали 851 804, через відсутність синхронізації між горутинами.

На основі пакета atomic ґрунтуються інші структури в Go, що використовують для синхронізації. Так, у тесті замість WaitGroup.Wait, ми можемо використати циклічну перевірку:

func TestParallelPureAtomic(t *testing.T) {
	const n = 1000000

	var value uint32

	for i := 0; i < n; i++ {
		go func() {
			atomic.AddUint32(&value, 1)
		}()
	}

	for atomic.LoadUint32(&value) < n {
		// NOP
	}
}

=== RUN TestParallelPureAtomic
--- PASS: TestParallelPureAtomic (0.42s)
PASS
ok gitlab.com/go-yp/go-sync/benchmarks 0.425s

і додати перемикання на інші горутини runtime.Gosched()

for atomic.LoadUint32(&value) < n {
		runtime.Gosched()
	}

Проте ліпше використовуйте sync.WaitGroup.

Синхронізація результатів від горутин і перегони даних (data race)

Якщо ви початківець у Go, а вам треба розпаралелити завдання, то ліпше напишіть тест для цього завдання, розпаралельте й запустіть з флагом -race, щоб перевірити наявність data race.

Data race — помилка проектування, стан програми, коли один або більше потоків змінюють дані без блокування й один або більше читають ці дані без блокування, у результаті програма працює інакше, ніж очікуємо.

Знайдемо функціонал, що розробили в однопотоковому варіанті й зі збільшенням завдань — розпаралелили.

Наприклад, візьмемо сторінку bestofjs.org/projects, де навпроти кожного проекту бачимо число зірок на GitHub-і.

Раз на день ці числа треба оновлювати, а в базі всього 20 репозиторіїв.

Є функція, щоб отримати число зірок за ID репозиторію:

func fetchRepositoryStarByID(id int) int {
	// emulate slow http request by sleep
	time.Sleep(100 * time.Millisecond)

	// emulate response
	var stars = id % 32

	return stars
}

і програма, яку запускають раз на день, щоб оновити число зірочок для кожного репозиторію:

package main

import (
	"fmt"
	"time"
)

const repositoryCount = 20

type repository struct {
	id        int
	starCount int
}

func main() {
	var startTime = time.Now()

	var ids = getRepositoryIDs()
	var repositories = fetchRepositoryStarsByIDs(ids)

	updateRepositoryStars(repositories)

	var duration = time.Since(startTime)

	fmt.Printf("fetch %d from %d repositories by %d \n", len(repositories), repositoryCount, duration)
}

func getRepositoryIDs() []int {
	return make([]int, repositoryCount)
}

func fetchRepositoryStarsByIDs(ids []int) []repository {
	var result = make([]repository, 0, len(ids))

	for _, id := range ids {
		result = append(result, repository{
			id:        id,
			starCount: fetchRepositoryStarByID(id),
		})
	}

	return result
}

func fetchRepositoryStarByID(id int) int {
	// emulate slow http request by sleep
	time.Sleep(100 * time.Millisecond)

	// emulate response
	var stars = id % 32

	return stars
}

func updateRepositoryStars(repositories []repository) {
	// NOP
}

Програма виконується за 2 секунди.

Опублікували React, Vue, Svelte й розширення до них, тепер у базі 100 репозиторіїв і програма виконується за 10 секунд.

З цим треба щось робити, бо чекаємо, що в проекті буде ще більше репозиторіїв, а отже, оновлення займатиме ще більше часу.

Вирішили розпаралелити з використанням горутин і WaitGroup.

Тепер fetchRepositoryStarsByIDs має такий вигляд:

package main

import (
	"fmt"
	"sync"
	"time"
)

const repositoryCount = 100

// ... same

func fetchRepositoryStarsByIDs(ids []int) []repository {
	var (
		length = len(ids)
		result = make([]repository, 0, length)
		wg     = new(sync.WaitGroup)
	)

	wg.Add(length)

	for _, id := range ids {
		go func() {
			result = append(result, repository{
				id:        id,
				starCount: fetchRepositoryStarByID(id),
			})

			wg.Done()
		}()
	}

	wg.Wait()

	return result
}

виконується за 100 мілісекунд, але в консолі бачимо, що тільки частину репозиторіїв оновлено:

fetch 81 from 100 repositories by 112999451

Запустимо з флагом -race

go run -race main.go

й отримаємо повідомлення, у яких рядках коду є помилки (вивів тільки частину повідомлення):

WARNING: DATA RACE
Read at 0×00c00009a020 by goroutine 8:
main.fetchRepositoryStarsByIDs.func1()
gitlab.com/go-yp/go-sync/main.go:41 +0×91

У цьому коді відразу дві помилки з data race.

Перша помилка data race: змінна id змінюється в основній горутині, де виконується цикл for і читається зі створених горутин.

Ось приклад, що покаже цю помилку:

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var ids = []int{1, 2, 3}
	var wg = new(sync.WaitGroup)

	wg.Add(3)
	for _, id := range ids {
		go func() {
			time.Sleep(time.Millisecond)

			fmt.Printf("id is %d\n", id)

			wg.Done()
		}()
	}

	wg.Wait()
}

id is 3
id is 3
id is 3

Варіанти розв’язання:

for _, id := range ids {
		go func(id int) {
			time.Sleep(time.Millisecond)

			fmt.Printf("id is %d\n", id)

			wg.Done()
		}(id)
	}
for _, id := range ids {
		id := id

		go func() {
			time.Sleep(time.Millisecond)

			fmt.Printf("id is %d\n", id)

			wg.Done()
		}()
	}

Друга помилка data race: це append, що змінює SliceHeader через append з багатьох горутин.

Є три відомі мені варіанти розв’язання проблеми data race під час збереження результатів від горутин.

У першому: ми ініціалізуємо slice, і кожна горутина пише у свій індекс:

func fetchRepositoryStarsByIDs(ids []int) []repository {
	var (
		length = len(ids)
		result = make([]repository, length)
		wg     = new(sync.WaitGroup)
	)

	wg.Add(length)

	for i, id := range ids {
		go func(i, id int) {
			result[i] = repository{
				id:        id,
				starCount: fetchRepositoryStarByID(id),
			}

			wg.Done()
		}(i, id)
	}

	wg.Wait()

	return result
}

Після запуску отримаємо очікуваний результат 100 зі 100 й без помилки data race.

fetch 100 from 100 repositories by 113026518

Якщо переглянути документацію — кожний елемент масиву як окрема змінна і в цьому прикладі з fetchRepositoryStarsByIDs кожна горутина працює зі своїм індексом (змінною), а отже, немає data race:

Structured variables of array, slice, and struct types have elements and fields that may be addressed individually. Each such element acts like a variable.

Це саме питання про запис у різні індекси є на stackoverflow.

Другий: обернути append в sync.Mutex:

package main

import (
	"fmt"
	"sync"
	"time"
)

const repositoryCount = 100

// ... same

func fetchRepositoryStarsByIDs(ids []int) []repository {
	var (
		length = len(ids)
		result = make([]repository, 0, length)
		wg     = new(sync.WaitGroup)
		mu     = new(sync.Mutex)
	)

	wg.Add(length)

	for _, id := range ids {
		go func(id int) {
			var starCount = fetchRepositoryStarByID(id)

			mu.Lock()
			result = append(result, repository{
				id:        id,
				starCount: starCount,
			})
			mu.Unlock()

			wg.Done()
		}(id)
	}

	wg.Wait()

	return result
}

Після запуску отримаємо очікуваний результат 100 зі 100 й без помилки data race (так само).

Жодної переваги перед першим варіантом.

Діє таке саме правило, що й у циклах:

  • Якщо дію можна винести за цикл, так ліпше й зробити.
  • Якщо дію можна винести за mutex, так ліпше й зробити.

Наприклад, написавши такий код:

go func(id int) {
	mu.Lock()
	var starCount = fetchRepositoryStarByID(id)

	result = append(result, repository{
		id:        id,
		starCount: starCount,
	})
	mu.Unlock()

	wg.Done()
}(id)

Програма буде заблокована під час виконання важкої операції fetchRepositoryStarByID і стане послідовною з часом виконання 10 секунд.

Третій: писати результати в канал, це стандартне рішення, бо канали створені для можливості писання й читання з багатьох горутин, без помилки data race:

func fetchRepositoryStarsByIDs(ids []int) []repository {
	var length = len(ids)
	// can also use channel with length, in this case result will be same
	// var resultChan = make(chan repository, length)
	var resultChan = make(chan repository)
	var wg = new(sync.WaitGroup)

	wg.Add(length)

	for _, id := range ids {
		go func(id int) {
			resultChan <- repository{
				id:        id,
				starCount: fetchRepositoryStarByID(id),
			}

			wg.Done()
		}(id)
	}

	go func() {
		wg.Wait()
		// close chan and break read loop
		close(resultChan)
	}()

	var repositories = make([]repository, 0, length)
	// read loop, while resultChan is open
	for result := range resultChan {
		repositories = append(repositories, result)
	}

	return repositories
}

Перевага цього варіанта в тому, що замість збереження в slice ми можемо відразу починати опрацьовувати результати й навіть розпаралелити читання, якщо потрібно.

Є ще один варіант — комбінація першого й другого:

func fetchRepositoryStarsByIDs(ids []int) []repository {
	var (
		length = len(ids)
		result = make([]repository, length)
		wg     = new(sync.WaitGroup)
		index  = int32(-1)
	)

	wg.Add(length)

	for _, id := range ids {
		go func(id int) {
			newIndex := atomic.AddInt32(&index, 1)

			result[newIndex] = repository{
				id:        id,
				starCount: fetchRepositoryStarByID(id),
			}

			wg.Done()
		}(id)
	}

	wg.Wait()

	return result
}

Throttling (rate limiting)

Проект із зірочками зростає і вже має 1000 репозиторіїв.

GitHub (або інший сервіс) починає повертати HTTP status 429 (too many requests) або HTTP Timeout замість зірок, коли є багато одночасних запитів.

У нашому прикладі ми додамо додатковий time.Sleep(time.Second) у fetchRepositoryStarByID, щоб емулювати затримку HTTP Timeout:

package main

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

const repositoryCount = 1000

// ... same

func fetchRepositoryStarByID(id int) int {
	// emulate timeout
	if runtime.NumGoroutine() > 250 {
		time.Sleep(time.Second)
	}

	// emulate slow http request by sleep
	time.Sleep(100 * time.Millisecond)

	// emulate response
	var stars = id % 32

	return stars
}

Запустимо:

go run main.go

fetch 1000 from 1000 repositories by 1232180912

Як бачимо, тепер код виконується за секунду.

Тепер обмежмо число одночасних запитів за раз до 200.

Розгляньмо теж 3 варіанти (якщо комбінувати з попередніми, то, звісно, буде більше).

Найпростіший варіант — через канали (можна також пошукати за словом semaphore):

package main

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

const (
	repositoryCount = 1000
	requestLimit    = 200
)

// ... same

func fetchRepositoryStarsByIDs(ids []int) []repository {
	var (
		length = len(ids)
		result = make([]repository, length)
		wg     = new(sync.WaitGroup)

		// WE ADD THIS
		throttler = make(chan struct{}, requestLimit)
	)

	wg.Add(length)

	for i, id := range ids {
		// WE ADD THIS
		throttler <- struct{}{}

		go func(i, id int) {
			result[i] = repository{
				id:        id,
				starCount: fetchRepositoryStarByID(id),
			}

			// WE ADD THIS
			<-throttler
			wg.Done()
		}(i, id)
	}

	wg.Wait()
	close(throttler)

	return result
}

go run -race main.go

fetch 1000 from 1000 repositories by 535724238

За півсекунди.

Ми пишемо в канал 200 разів і запускаємо 200 горутин, запустити наступну зможемо тоді, коли хтось прочитає з каналу.

Варіант через запуск воркерів (workers).

Запускаємо N воркерів (де N — наша константа request Limit), що з одного каналу читають завдання, а в інший пишуть результат:

package main

// ... same

func workerFetchRepositoryStarByID(wg *sync.WaitGroup, requestIdChannel <-chan int, responseRepositoryChannel chan<- repository) {
	// read while requestIdChannel open
	for id := range requestIdChannel {
		var starCount = fetchRepositoryStarByID(id)

		responseRepositoryChannel <- repository{
			id:        id,
			starCount: starCount,
		}
	}

	wg.Done()
}

func fetchRepositoryStarsByIDs(ids []int) []repository {
	var (
		length      = len(ids)
		result      = make([]repository, 0, length)
		workerCount = requestLimit
	)

	if workerCount > length {
		workerCount = length
	}

	var (
		requestIdChannel          = make(chan int, workerCount)
		responseRepositoryChannel = make(chan repository, workerCount)

		workerComplete = new(sync.WaitGroup)
		readComplete   = new(sync.WaitGroup)
	)

	workerComplete.Add(workerCount)
	for i := 0; i < workerCount; i++ {
		go workerFetchRepositoryStarByID(workerComplete, requestIdChannel, responseRepositoryChannel)
	}

	readComplete.Add(1)
	go func() {
		for responseRepository := range responseRepositoryChannel {
			result = append(result, responseRepository)
		}
		readComplete.Done()
	}()

	for _, id := range ids {
		requestIdChannel <- id
	}

	close(requestIdChannel)
	workerComplete.Wait()

	close(responseRepositoryChannel)
	readComplete.Wait()

	return result
}

go run -race main.go

fetch 1000 from 1000 repositories by 540738968

Так багато коду для синхронізації, бо ми визначили розміри каналів, що менше загальної чисельності репозиторіїв.

Цей код працюватиме навіть з каналами без буфера:

var (
	requestIdChannel          = make(chan int, 0)
	responseRepositoryChannel = make(chan repository, 0)
)

Якщо ж використовувати ресурси пам’яті сповна, то код матиме простіший вигляд:

func fetchRepositoryStarsByIDs(ids []int) []repository {
	var (
		length      = len(ids)
		result      = make([]repository, 0, length)
		workerCount = requestLimit
	)

	if workerCount > length {
		workerCount = length
	}

	var (
		requestIdChannel          = make(chan int, length)
		responseRepositoryChannel = make(chan repository, length)
		workerComplete            = new(sync.WaitGroup)
	)

	workerComplete.Add(workerCount)
	for i := 0; i < workerCount; i++ {
		go workerFetchRepositoryStarByID(workerComplete, requestIdChannel, responseRepositoryChannel)
	}

	for _, id := range ids {
		requestIdChannel <- id
	}
	close(requestIdChannel)

	workerComplete.Wait()

	close(responseRepositoryChannel)
	for responseRepository := range responseRepositoryChannel {
		result = append(result, responseRepository)
	}

	return result
}

і якщо потім захочемо змінити:

var (
	requestIdChannel          = make(chan int, 0)
	responseRepositoryChannel = make(chan repository, 0)
)

то заблокуємо виконання назавжди, коли воркери почнуть писати в responseRepositoryChannel, а читання вже після завершення.

Останній варіант — розділити початковий slice на пачки й для кожної розпаралелити виконання.

Ми використаємо зовнішній пакет, що поділить на діапазони:

package main

import (
	"fmt"
	"github.com/gopereza/packer"
	"runtime"
	"sync"
	"time"
)

const (
	repositoryCount = 1000
	requestLimit    = 200
)

// ... same

func fetchRepositoryStarsByIDs(ids []int) []repository {
	var (
		length = len(ids)
		result = make([]repository, length)
		wg     = new(sync.WaitGroup)
	)

	var packs = packer.Pack(length, requestLimit)

	for _, pack := range packs {
		for i := pack.From; i < pack.To; i++ {
			wg.Add(1)

			go func(i int) {
				var id = ids[i]

				result[i] = repository{
					id:        id,
					starCount: fetchRepositoryStarByID(id),
				}

				wg.Done()
			}(i)
		}

		wg.Wait()
	}

	return result
}

Схожий приклад бачив у реальному проекті.

Недолік такого рішення — одна тривала операція затримає виконання всієї пачки, тому переписував на варіант з воркерами.

Епілог

Ми в проекті вибрали варіант з воркерами, що відправляють пачками.

Репозиторій, у якому тестував варіанти.

Далі буде.

Похожие статьи:
Компания Microsoft, как и ожидалось, официально анонсировала смартфон Lumia 650. Информация действительно появилась в блоге Microsoft, специальное...
Хто розробляє такі цифрові рішення, як Армія+, DELTA, і як можна долучитися? Які системи з’являються на рівні бригад, підрозділів, чому...
Необходимость качественного и грамотного написания кода — вот одна из основополагающих вещей, которым мы обучаем будущих...
Международный музыкальный сервис Guvera объявил о большом обновлении своего приложения, которое, с его слов, значительно...
An area rug is often the piece that brings your room together, but as we all know, it often becomes dirty very quickly because everyone walks over it, food is spilt…not to mention any pets that you may have who...
Яндекс.Метрика