Синхронізація в Go: використання спільних даних
Привіт, мене звати Ярослав. Працюю в компанії Evrius, три роки розробляю на Go, а раніше писав на PHP.
Помітив, що коли на співбесіді з Go питають про синхронізацію, то переважно запитання звучить: «Як розпаралелити задачу?». Про це я писав раніше. Але на співбесіді питають про одне, а в проєкті — інше, там значно більше випадків, коли дані читаються з багатьох горутин, а оновлюють в одній. Тоді краще використовувати оптимальні структури sync.RWMutex та atomic.Value. Про це й буде стаття. Тут ви знайдете приклади коду, помилок, тести, бенчмарки.
Матеріал буде цікавий спеціалістам, які збираються перекваліфікуватись на Go або вже мають досвід з цієї мовою та хочуть краще структурувати свої знання.
Власне, я згадав про PHP, бо маю багато знайомих PHP-розробників і інколи відповідаю на запитання «Як перекваліфікуватись з PHP на Go». Також інколи чую, як PHP-розробникам зі знанням Go, рекрутери пропонують розглянути вакансії Golang Team Lead. Якщо вам буде цікаво почитати про те, як перекваліфікуватись з PHP на Go, дайте знати про це в коментарях чи напишіть мені в LinkedIn, щоб я розумів актуальність питання і мав мотивацію взятись за статтю з прикладами коду та порівнянням.
Безпечне читання даних без синхронізації
Коли в коді просто читаємо завчасно завантажені дані без оновлень з багатьох горутин, то синхронізація зайва. Штучний приклад, коли багато горутин читають одні дані:
package main import ( "fmt" ) func main() { var values = map[byte]int{ 'A': 1, 'B': 2, 'C': 3, } var keys = []byte{'A', 'B', 'C'} // panic: too many concurrent operations on a single file or socket (max 1048575) for try := 1048575; try > 0; try-- { go func() { for i, key := range keys { // safe goroutine read data without any sync var value = values[key] fmt.Printf("index = %d, key = %c, value = %d\n", i, key, value) } }() } }
Я запустив цей приклад з параметром race (для перевірки на data race): go run main.go -race
.
Якщо ж у першій горутині будемо читати з мапи, а в другій — писати в мапу, то отримаємо помилку fatal error: concurrent map read and map write. Про цю помилку також написано в офіційному блозі Go maps in action:
Maps are not safe for concurrent use: it’s not defined what happens when you read and write to them simultaneously.
Розглянемо її детальніше.
Fatal error: concurrent map read and map write
Якщо взяти попередній приклад і додати одну горутину, яка буде писати в мапу, то отримаємо помилку, навіть якщо будемо оновлювати вже наявні ключі.
// BAD CODE WITH ERROR EXAMPLE package main import ( "fmt" ) func main() { var values = map[byte]int{ 'A': 1, 'B': 2, 'C': 3, } var keys = []byte{'A', 'B', 'C'} go func() { for { for _, key := range keys { // UNSAFE, fatal error: concurrent map read and map write values[key] = values[key] + 1 } } }() // panic: too many concurrent operations on a single file or socket (max 1048575) for try := 1048575; try > 0; try-- { go func() { for i, key := range keys { // UNSAFE, fatal error: concurrent map read and map write var value = values[key] fmt.Printf("index = %d, key = %c, value = %d\n", i, key, value) } }() } }
go run main.go -race
Після запуску програма виведе в термінал очікувані повідомлення ~ index = 0, key = A, value = 850 N разів, а потім завершиться помилкою:
fatal error: concurrent map read and map write
Повна заміна мапи без синхронізації
Знову досліджуємо попередній приклад і замість оновлення мапи будемо робити її повну заміну. Перевіримо, чи це безпечно.
// BAD CODE WITH UNSAFE EXAMPLE package main import ( "fmt" ) func main() { var values = map[byte]int{ 'A': 1, 'B': 2, 'C': 3, } var keys = []byte{'A', 'B', 'C'} go func() { for { var replaceValues = make(map[byte]int, len(values)) for _, key := range keys { replaceValues[key] = values[key] + 1 } // UNSAFE replace on some GOARCH, for example arm(32) values = replaceValues } }() // panic: too many concurrent operations on a single file or socket (max 1048575) for try := 1048575; try > 0; try-- { go func() { for i, key := range keys { // UNSAFE read on some GOARCH, for example arm(32) var value = values[key] fmt.Printf("index = %d, key = %c, value = %d\n", i, key, value) } }() } }
go run main.go -race
Після запуску жодних помилок у терміналі, отже, повна заміна мапи виконалась успішно на моєму комп’ютері.
go env
GOARCH="amd64" GOHOSTARCH="amd64"
На архітектурі amd64 — повна заміна мапи (вказівник розміром 8 байтів, або 64 біти) є атомарною операцією (безпечною), але на інших архітектурах, таких як
Універсальний sync.Mutex
Завдання sync.Mutex — надати ексклюзивний доступ до даних за допомогою двох методів Lock() та Unlock(). Візьмемо попередні приклади і зробимо їх потокобезпечними, використовуючи sync.Mutex. Приклад, в якому була помилка fatal error: concurrent map read and map write з sync.Mutex, буде саме таким:
package main import ( "fmt" "sync" ) func main() { var values = map[byte]int{ 'A': 1, 'B': 2, 'C': 3, } var keys = []byte{'A', 'B', 'C'} var mu = new(sync.Mutex) go func() { for { for _, key := range keys { mu.Lock() values[key] = values[key] + 1 mu.Unlock() } } }() // panic: too many concurrent operations on a single file or socket (max 1048575) for try := 1048575; try > 0; try-- { go func() { for i, key := range keys { mu.Lock() var value = values[key] mu.Unlock() fmt.Printf("index = %d, key = %c, value = %d\n", i, key, value) } }() } }
go run main.go -race
Бачимо успішне завершення.
Але правильніше буде винести дані і мютекс в одну структуру, яка буде відповідати за доступ до даних. Об’єднати в одну структуру — це стандартне рішення, бо більше зрозуміло, які маніпуляції з даними відбуваються.
package main import ( "fmt" "sync" ) type SyncCounter struct { data map[byte]int mu sync.Mutex } func NewSyncCounter(data map[byte]int) *SyncCounter { return &SyncCounter{ data: data, } } func (c *SyncCounter) Increment(key byte) { c.mu.Lock() c.data[key] += 1 c.mu.Unlock() } func (c *SyncCounter) Get(key byte) int { c.mu.Lock() var value = c.data[key] c.mu.Unlock() return value } func main() { var values = NewSyncCounter(map[byte]int{ 'A': 1, 'B': 2, 'C': 3, }) var keys = []byte{'A', 'B', 'C'} go func() { for { for _, key := range keys { values.Increment(key) } } }() // panic: too many concurrent operations on a single file or socket (max 1048575) for try := 1048575; try > 0; try-- { go func() { for i, key := range keys { var value = values.Get(key) fmt.Printf("index = %d, key = %c, value = %d\n", i, key, value) } }() } }
На початку знайомства з Go універсального sync.Mutex та каналів вистачить для написання коду, але якщо захочете замінити на sync.RWMutex чи atomic.Value, то краще пишіть тести і запускайте з параметром race, щоб перевірити наявність помилок і можливе погіршення швидкодії після оптимізації.
Повна заміна даних з sync.Mutex, sync.RWMutex та atomic.Value
Далі зосередимось тільки на структурах з даними та потокобезпечним доступом. Коли повністю замінюємо дані, достатньо обгорнути мютексом тільки заміну і отримання вказівника:
import "sync" type CityOnlineMutexMap struct { data map[string]uint32 mu sync.Mutex } func NewCityOnlineMutexMap(data map[string]uint32) *CityOnlineMutexMap { return &CityOnlineMutexMap{ data: data, } } func (c *CityOnlineMutexMap) Update(data map[string]uint32) { c.mu.Lock() c.data = data c.mu.Unlock() } func (c *CityOnlineMutexMap) Get(cityName string) uint32 { c.mu.Lock() var data = c.data c.mu.Unlock() return data[cityName] }
У цьому прикладі пошук в мапі за ключем я виніс за мютекс, бо це безпечно. Коли дані частіше читаються, ніж пишуться, то можемо використати більш оптимальний для таких дій sync.RWMutex:
import "sync" type CityOnlineRWMutexMap struct { data map[string]uint32 mu sync.RWMutex } func NewCityOnlineRWMutexMap(data map[string]uint32) *CityOnlineRWMutexMap { return &CityOnlineRWMutexMap{ data: data, } } func (c *CityOnlineRWMutexMap) Update(data map[string]uint32) { c.mu.Lock() c.data = data c.mu.Unlock() } func (c *CityOnlineRWMutexMap) Get(cityName string) uint32 { c.mu.RLock() var data = c.data c.mu.RUnlock() return data[cityName] }
У цьому прикладі заміна буде відбуватись довше, бо під капотом RWMutex.Lock() викликається звичайний Mutex.Lock() та додаткові перевірки. Але читання через RWMutex.RLock швидше. Тести будуть далі.
Яка різниця між Mutex та RWMutex — одне зі стандартних питань на співбесіді. Якщо у вас є налаштований локально Go і IDE, то можете зайти в реалізацію RWMutex та почитати коментарі, які пояснюють, як працює RWMutex:
// RLock // A writer is pending, wait for it. // Lock // Announce to readers there is a pending writer. // Wait for active readers.
import ( "sync/atomic" ) type CityOnlineAtomicMap struct { data atomic.Value } func NewCityOnlineAtomicMap(data map[string]uint32) *CityOnlineAtomicMap { var result = new(CityOnlineAtomicMap) result.Update(data) return result } func (c *CityOnlineAtomicMap) Update(data map[string]uint32) { c.data.Store(data) } func (c *CityOnlineAtomicMap) Get(cityName string) uint32 { var data = c.data.Load().(map[string]uint32) return data[cityName] }
А тепер напишемо тест, щоб порівняти, яка структура найкраща для повної заміни даних:
package main import ( "sync" "testing" "time" ) type cityOnlineMap interface { Update(data map[string]uint32) Get(cityName string) uint32 } func BenchmarkCityOnlineMutexMap(b *testing.B) { benchmarkCityOnlineMap(b, NewCityOnlineMutexMap(getCityOnlineMap())) } func BenchmarkCityOnlineRWMutexMap(b *testing.B) { benchmarkCityOnlineMap(b, NewCityOnlineRWMutexMap(getCityOnlineMap())) } func BenchmarkCityOnlineAtomicMap(b *testing.B) { benchmarkCityOnlineMap(b, NewCityOnlineAtomicMap(getCityOnlineMap())) } func benchmarkCityOnlineMap(b *testing.B, data cityOnlineMap) { b.Helper() var once = new(sync.Once) var cities = []string{"kyiv", "kharkiv", "lviv", "dnipro", "odessa"} b.ResetTimer() b.RunParallel(func(pb *testing.PB) { var isWriter = false once.Do(func() { isWriter = true }) if isWriter { for pb.Next() { data.Update(getCityOnlineMap()) // read much more often than it is written time.Sleep(time.Microsecond) } } else { for pb.Next() { for _, cityName := range cities { _ = data.Get(cityName) } } } }) } func getCityOnlineMap() map[string]uint32 { var now = uint32(time.Now().Unix()) return map[string]uint32{ "kyiv": now, "kharkiv": now, "lviv": now, "dnipro": now, "odessa": now, } }
go test ./... -v -bench=. -benchmem
Назва тесту | Середній час ітерації | Виділення пам’яті |
BenchmarkCityOnlineMutexMap | 410 ns/op | 4 B/op 0 allocs/op |
BenchmarkCityOnlineRWMutexMap | 241 ns/op | 0 B/op 0 allocs/op |
BenchmarkCityOnlineAtomicMap | 10.4 ns/op | 0 B/op 0 allocs/op |
Якщо в задачі повна заміна даних, то краще використовувати atomic.Value як найефективнішу структуру (для цілочисельних даних є atomic.StoreUint64, atomic.StoreUint32, atomic.StoreInt64 та atomic.StoreInt32).
Екзотичні варіанти синхронізації через канали
Перший екзотичний варіант, який бачив у реальному проєкті:
// BAD CODE EXAMPLE, DON'T COPY-PASTE type CityOnlineChanMutexMap struct { data map[string]uint32 chanMutex chan struct{} } func NewCityOnlineChanMutexMap(data map[string]uint32) *CityOnlineChanMutexMap { return &CityOnlineChanMutexMap{ data: data, chanMutex: make(chan struct{}, 1), } } func (c *CityOnlineChanMutexMap) Update(data map[string]uint32) { c.chanMutex <- struct{}{} c.data = data _ = <-c.chanMutex } func (c *CityOnlineChanMutexMap) Get(cityName string) uint32 { c.chanMutex <- struct{}{} var result = c.data[cityName] _ = <-c.chanMutex return result }
Під капотом каналів мютекси, і варіант вище поки найповільніший. У проєкті переписав його на sync.Mutex.
Другий екзотичний варіант помітив на просторах інтернету:
// BAD CODE EXAMPLE, DON'T COPY-PASTE type cityOnlineRequest struct { cityName string online chan uint32 } type CityOnlineChanReactorMap struct { data map[string]uint32 requestChan chan cityOnlineRequest dataChan chan map[string]uint32 } func NewCityOnlineChanReactorMap(data map[string]uint32) *CityOnlineChanReactorMap { var result = &CityOnlineChanReactorMap{ data: data, requestChan: make(chan cityOnlineRequest), dataChan: make(chan map[string]uint32), } go result.run() return result } func (c *CityOnlineChanReactorMap) Update(data map[string]uint32) { c.dataChan <- data } func (c *CityOnlineChanReactorMap) Get(cityName string) uint32 { var request = cityOnlineRequest{ cityName: cityName, online: make(chan uint32), } c.requestChan <- request return <-request.online } func (c *CityOnlineChanReactorMap) run() { for { select { case request := <-c.requestChan: request.online <- c.data[request.cityName] case data := <-c.dataChan: c.data = data } } }
Цей варіант ще повільніший.
Назва тесту | Середній час ітерації | Виділення пам’яті |
BenchmarkCityOnlineMutexMap | 410 ns/op | 4 B/op 0 allocs/op |
BenchmarkCityOnlineRWMutexMap | 241 ns/op | 0 B/op 0 allocs/op |
BenchmarkCityOnlineAtomicMap | 10.4 ns/op | 0 B/op 0 allocs/op |
BenchmarkCityOnlineChanMutexMap | 2037 ns/op | 60 B/op 0 allocs/op |
BenchmarkCityOnlineChanReactorMap | 3740 ns/op | 467 B/op 4 allocs/op |
Якщо на вашому проєкті є щось схоже, можете відправити мені анонімно — і я додам в коментарях.
Повернення гетера
У попередніх прикладах розглядали взаємодію з даними всередині структури. Але якщо нам знадобиться отримати значення одразу для багатьох ключів, то робити синхронізацію на кожен ключ буде повільно. Якщо ж повернемо всю мапу, буде складніше розуміти, що далі відбувається з даними. Тож повернемо інтерфейс CityOnlineGetter:
import ( "sync/atomic" ) type CityOnlineGetter interface { Get(cityName string) uint32 } type CityOnlineMap struct { data map[string]uint32 } func (c *CityOnlineMap) Get(cityName string) uint32 { return c.data[cityName] } type CityOnlineAtomicMap struct { data atomic.Value } func NewCityOnlineAtomicMap(data map[string]uint32) *CityOnlineAtomicMap { var result = new(CityOnlineAtomicMap) result.Update(data) return result } func (c *CityOnlineAtomicMap) Update(data map[string]uint32) { c.data.Store(data) } func (c *CityOnlineAtomicMap) Get(cityName string) uint32 { var data = c.data.Load().(map[string]uint32) return data[cityName] } // BAD CODE //func (c *CityOnlineAtomicMap) Load() map[string]uint32 { // var data = c.data.Load().(map[string]uint32) // // return data //} func (c *CityOnlineAtomicMap) Load() CityOnlineGetter { var data = c.data.Load().(map[string]uint32) return &CityOnlineMap{ data: data, } }
Помилки, які можуть трапитися під час оновлення даних
Цю помилку, коли мютекс довго блокує виконання, бачив у реальних проєктах. Правильні приклади без помилок вже наведені у цій статті вище.
// BAD CODE EXAMPLE, DON'T COPY-PASTE import ( "sync" "time" ) type CityOnline struct { CityName string Online uint32 } type CityOnlineTooLongUpdateMap struct { data map[string]uint32 mu sync.RWMutex } func NewCityOnlineTooLongUpdateMap(data map[string]uint32) *CityOnlineTooLongUpdateMap { return &CityOnlineTooLongUpdateMap{ data: data, } } func (c *CityOnlineTooLongUpdateMap) UpdateBySlice(items []CityOnline) { c.mu.Lock() defer c.mu.Unlock() // other logic // for example API call, emulate by time.Sleep time.Sleep(time.Second) c.data = make(map[string]uint32, len(items)) for _, item := range items { c.data[item.CityName] = item.Online } } func (c *CityOnlineTooLongUpdateMap) Get(cityName string) uint32 { c.mu.RLock() defer c.mu.RUnlock() return c.data[cityName] }
У такому прикладі RLock буде чекати defer c.mu.Unlock(), відповідно читання буде заблоковано на секунду. Те саме стосується звичайного Mutex-а.
atomic.Value та збереження інших типів
Коли в статті розглядаєш тільки мапи, то виникає відчуття, що тільки з мапами atomic.Value і використовують. Що ж, для слайсів і звичайних структур також підходить:
import "sync/atomic" type CountrySettings struct { BlockCIDRs []string `json:"block_cidrs"` } type Settings struct { Countries map[string]CountrySettings `json:"countries"` PackSize uint32 `json:"pack_size"` Interval uint32 `json:"interval"` Debug bool `json:"debug"` } type SettingsProxy struct { data atomic.Value } func NewSettingsProxy() *SettingsProxy { return &SettingsProxy{} } func (s *SettingsProxy) Update(value Settings) { s.data.Store(value) } func (s *SettingsProxy) Load() (Settings, bool) { var result, ok = s.data.Load().(Settings) return result, ok }
import ( "github.com/stretchr/testify/require" "testing" ) func TestSettingsProxy(t *testing.T) { var proxy = NewSettingsProxy() { var settings, ok = proxy.Load() require.Equal(t, Settings{}, settings) require.Equal(t, false, ok) } { var expected = Settings{ Countries: map[string]CountrySettings{ "UA": { BlockCIDRs: nil, }, }, PackSize: 20000, Interval: 60, Debug: true, } proxy.Update(expected) var actual, ok = proxy.Load() require.Equal(t, expected, actual) require.Equal(t, true, ok) } }
У попередніх прикладах в конструкторі я робив збереження значення в atomic.Value, а тут навмисно пропустив, щоб вказати потребу перевірки при приведенні типу:
func (s *SettingsProxy) Load() (Settings, bool) { var result, ok = s.data.Load().(Settings) return result, ok }
Бо інакше буде паніка:
panic: interface conversion: interface {} is nil
Такий варіант теж допустимий:
func (s *SettingsProxy) Update(value *Settings) { s.data.Store(value) } func (s *SettingsProxy) Load() (*Settings, bool) { var result, ok = s.data.Load().(*Settings) return result, ok }
Застереження atomic.Value
Під час спроби зберегти nil чи різні типи отримуємо паніку. Ось тести, які показують таку поведінку:
func TestAtomicSuccessStoreNil(t *testing.T) { var atomicValue = new(atomic.Value) var value map[uint32]uint32 = nil atomicValue.Store(value) } func TestAtomicPanicOnStoreNil(t *testing.T) { defer func() { if r := recover(); r == nil { t.Errorf("The code did not panic") } }() var atomicValue = new(atomic.Value) // will panic atomicValue.Store(nil) } func TestAtomicPanicOnStoreNilInterface(t *testing.T) { defer func() { if r := recover(); r == nil { t.Errorf("The code did not panic") } }() var atomicValue = new(atomic.Value) var value interface{} = nil // will panic atomicValue.Store(value) } func TestAtomicPanicOnStoreDifferentTypes(t *testing.T) { defer func() { if r := recover(); r == nil { t.Errorf("The code did not panic") } }() var atomicValue = new(atomic.Value) { var value uint32 // will success atomicValue.Store(value) } { var value uint32 = 1 // will success atomicValue.Store(value) } { var value = "" // will panic atomicValue.Store(value) } }
Ці застереження написані в коментарях до коду atomic.Value.
atomic.Value та збереження int32, int64, uint32, uint64
Для числових типів можна використовувати й atomic.Value:
import "sync/atomic" type AtomicValueUint64 struct { data atomic.Value } func NewAtomicValueUint64(data uint64) *AtomicValueUint64 { var result = new(AtomicValueUint64) result.Update(data) return result } func (c *AtomicValueUint64) Update(data uint64) { c.data.Store(data) } func (c *AtomicValueUint64) Load() uint64 { return c.data.Load().(uint64) }
Але можна простіше:
import "sync/atomic" type AtomicUint64 struct { data uint64 } func NewAtomicUint64(value uint64) *AtomicUint64 { var result = new(AtomicUint64) result.Update(value) return result } func (c *AtomicUint64) Update(value uint64) { atomic.StoreUint64(&c.data, value) } func (c *AtomicUint64) Load() uint64 { return atomic.LoadUint64(&c.data) }
Є спеціальні функції в пакеті sync/atomic:
func LoadInt32(addr *int32) (val int32) {} func LoadInt64(addr *int64) (val int64) {} func LoadUint32(addr *uint32) (val uint32) {} func LoadUint64(addr *uint64) (val uint64) {} func StoreInt32(addr *int32, val int32) {} func StoreInt64(addr *int64, val int64) {} func StoreUint32(addr *uint32, val uint32) {} func StoreUint64(addr *uint64, val uint64) {}
Епілог
У статті є повно прикладів, які трохи відрізняються, щоб краще запам’ятати. Бо коли код простий, то сам приклад зрозуміліший за текстовий опис.
Схожі оптимізації з atomic.Value для повної заміни даних знадобляться під час написання бібліотек. У робочому закритому проєкті краще використовуйте RWMutex, бо конвертація типів — це джерело помилок.
А ще під кожний тип даних треба писати окрему обгортку з RWMutex, як було у прикладах, бо в Go відсутні дженерики.