Прежде чем перейти к статье, хочу вам представить, экономическую онлайн игру Brave Knights, в которой вы можете играть и зарабатывать. Регистируйтесь, играйте и зарабатывайте!
Привет!
Вэтой статье я хотел бы расcказать, как можно было бы сделать свой RWMutex, но с возможностью по таймауту или по срабатыванию контекста пропустить блокировку. То есть реализовать TryLock(context.Context) и RTryLock(context.Context), но уже для своего Mutex.
На картинке изображено, как нужно наливать воду в очень узкое горлышко.
Для начала следует уточнить, что для 99% задач такие методы вообще не нужны. Нужны они становятся тогда, когда блокируемый ресурс может очень долго не отпускаться. Хочу отметить, что если блокируемый ресурс долгое время остаётся занятым, стоит в начале попробовать оптимизировать логику таким образом, что бы минимизировать время блокировки.
Подробнее об этом написано в посте Танцы с мьютексами в Go в примере 2.
Но если всё же нам приходится иметь долгое удержание одним потоком ресурсов, то как мне кажется без TryLock будет сложно обойтись.
Когда мы работаем так, что у нас нет параллельных потоков, мы будем просто менять переменную методами из пакета atomic, записывая факт блокировки или разблокировки мьютекса. Но вот если наш запрос пришел в заблокированнй мьютекс, то мы пойдём по длинному сценарию. В этом случае нам нужно будет разблокировать ожидающие потоки по завершении блокировки, и для этого, можно воспользоваться каналом. А точнее свойством канала, что если мы его закрываем, то все читающие из него потоки разблокируются.
Создадим Mutex:
// RWTMutex - Read Write and Try Mutex
type RWTMutex struct {
state int32
mx sync.Mutex
ch chan struct{}
}
state — состояние mutex, мы будем работать с ним через atomic.AddInt32, atomic.LoadInt32 и atomic.CompareAndSwapInt32
ch — канал, который будет разблокировать потоки.
mx — мьютекс, который используется для того, что бы разблокировка потоков и создания каналов не происходили параллельно.
А теперь можно перейти к реализации:
// TryLock - try locks mutex with context
func (m *RWTMutex) TryLock(ctx context.Context) bool {
if atomic.CompareAndSwapInt32(&m.state, 0, -1) {
return true
}
// Slow way
return m.lockST(ctx)
}
// RTryLock - try read locks mutex with context
func (m *RWTMutex) RTryLock(ctx context.Context) bool {
k := atomic.LoadInt32(&m.state)
if k >= 0 && atomic.CompareAndSwapInt32(&m.state, k, k+1) {
return true
}
// Slow way
return m.rlockST(ctx)
}
Как можно увидеть, если Мьютекс не залочен, то его можно просто заблокировать, но вот если нет, то мы перейдём к более сложной схеме.
В начале получим канал, и переходим в бесконечный цикл, если получилось залочить, выходим с успехом, а если нет, то мы начинаем ждать одного из 2х событий, или что канал разблокируется, или что разблокирует поток ctx.Done():
func (m *RWTMutex) chGet() chan struct{} {
m.mx.Lock()
if m.ch == nil {
m.ch = make(chan struct{}, 1)
}
r := m.ch
m.mx.Unlock()
return r
}
func (m *RWTMutex) lockST(ctx context.Context) bool {
ch := m.chGet()
for {
if atomic.CompareAndSwapInt32(&m.state, 0, -1) {
return true
}
if ctx == nil {
return false
}
select {
case <-ch:
ch = m.chGet()
case <-ctx.Done():
return false
}
}
}
func (m *RWTMutex) rlockST(ctx context.Context) bool {
ch := m.chGet()
var k int32
for {
k = atomic.LoadInt32(&m.state)
if k >= 0 && atomic.CompareAndSwapInt32(&m.state, k, k+1) {
return true
}
if ctx == nil {
return false
}
select {
case <-ch:
ch = m.chGet()
case <-ctx.Done():
return false
}
}
}
Давайте раблокируем мьютекс.
Нам потребуется поменять состояние и при необходимости разблокировать канал.
Как я уже писал выше, если канал закрыть, то case <-ch пропустит поток выполнения дальше.
func (m *RWTMutex) chClose() {
if m.ch == nil {
return
}
var o chan struct{}
m.mx.Lock()
if m.ch != nil {
o = m.ch
m.ch = nil
}
m.mx.Unlock()
if o != nil {
close(o)
}
}
// Unlock - unlocks mutex
func (m *RWTMutex) Unlock() {
if atomic.CompareAndSwapInt32(&m.state, -1, 0) {
m.chClose()
return
}
panic("RWTMutex: Unlock fail")
}
// RUnlock - unlocks mutex
func (m *RWTMutex) RUnlock() {
i := atomic.AddInt32(&m.state, -1)
if i > 0 {
return
} else if i == 0 {
m.chClose()
return
}
panic("RWTMutex: RUnlock fail")
}
Собственно мьютекс готов, нужно к нему написать пару тестов и стандартных методов типа Lock() и RLock()
Бенчмарки на моей машине показали вот такие скорости
Описанные выше методы
BenchmarkRWTMutexTryLockUnlock-8 92154297 12.8 ns/op 0 B/op 0 allocs/op
BenchmarkRWTMutexTryRLockRUnlock-8 64337136 18.4 ns/op 0 B/op 0 allocs/op
Стандартный RWMutex
BenchmarkRWMutexLockUnlock-8 44187962 25.8 ns/op 0 B/op 0 allocs/op
BenchmarkRWMutexRLockRUnlock-8 94655520 12.6 ns/op 0 B/op 0 allocs/op
Стандартный Mutex
BenchmarkMutexLockUnlock-8 94345815 12.7 ns/op 0 B/op 0 allocs/op
То есть скорость работы сопоставима с обычным RWMutex и Mutex.
RWTMutex