Browse Source

fix lock table (#4863)

* adjust for better logs

* if shared lock, still need to wait for exclusive lock to release

* fix compilation

* fix waiting condition

* avoid deleting the entry too early
pull/4864/head
Chris Lu 1 year ago
committed by GitHub
parent
commit
76a62859ff
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 44
      weed/util/lock_table.go

44
weed/util/lock_table.go

@ -16,11 +16,12 @@ type LockTable[T comparable] struct {
} }
type LockEntry struct { type LockEntry struct {
mu sync.Mutex
waiters []*ActiveLock // ordered waiters that are blocked by exclusive locks
activeLockOwnerCount int32
lockType LockType
cond *sync.Cond
mu sync.Mutex
waiters []*ActiveLock // ordered waiters that are blocked by exclusive locks
activeSharedLockOwnerCount int32
activeExclusiveLockOwnerCount int32
lockType LockType
cond *sync.Cond
} }
type LockType int type LockType int
@ -34,6 +35,7 @@ type ActiveLock struct {
ID int64 ID int64
isDeleted bool isDeleted bool
intention string // for debugging intention string // for debugging
lockType LockType
} }
func NewLockTable[T comparable]() *LockTable[T] { func NewLockTable[T comparable]() *LockTable[T] {
@ -42,9 +44,9 @@ func NewLockTable[T comparable]() *LockTable[T] {
} }
} }
func (lt *LockTable[T]) NewActiveLock(intention string) *ActiveLock {
func (lt *LockTable[T]) NewActiveLock(intention string, lockType LockType) *ActiveLock {
id := atomic.AddInt64(&lt.lockIdSeq, 1) id := atomic.AddInt64(&lt.lockIdSeq, 1)
l := &ActiveLock{ID: id, intention: intention}
l := &ActiveLock{ID: id, intention: intention, lockType: lockType}
return l return l
} }
@ -59,13 +61,13 @@ func (lt *LockTable[T]) AcquireLock(intention string, key T, lockType LockType)
} }
lt.mu.Unlock() lt.mu.Unlock()
lock = lt.NewActiveLock(intention)
lock = lt.NewActiveLock(intention, lockType)
// If the lock is held exclusively, wait // If the lock is held exclusively, wait
entry.mu.Lock() entry.mu.Lock()
if len(entry.waiters) > 0 || lockType == ExclusiveLock {
if len(entry.waiters) > 0 || lockType == ExclusiveLock || entry.activeExclusiveLockOwnerCount > 0 {
if glog.V(4) { if glog.V(4) {
fmt.Printf("ActiveLock %d %s wait for %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeLockOwnerCount)
fmt.Printf("ActiveLock %d %s wait for %+v type=%v with waiters %d active r%d w%d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeSharedLockOwnerCount, entry.activeExclusiveLockOwnerCount)
if len(entry.waiters) > 0 { if len(entry.waiters) > 0 {
for _, waiter := range entry.waiters { for _, waiter := range entry.waiters {
fmt.Printf(" %d", waiter.ID) fmt.Printf(" %d", waiter.ID)
@ -75,11 +77,11 @@ func (lt *LockTable[T]) AcquireLock(intention string, key T, lockType LockType)
} }
entry.waiters = append(entry.waiters, lock) entry.waiters = append(entry.waiters, lock)
if lockType == ExclusiveLock { if lockType == ExclusiveLock {
for !lock.isDeleted && ((len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) || entry.activeLockOwnerCount > 0) {
for !lock.isDeleted && ((len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) || entry.activeExclusiveLockOwnerCount > 0 || entry.activeSharedLockOwnerCount > 0) {
entry.cond.Wait() entry.cond.Wait()
} }
} else { } else {
for !lock.isDeleted && (len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) {
for !lock.isDeleted && (len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) || entry.activeExclusiveLockOwnerCount > 0 {
entry.cond.Wait() entry.cond.Wait()
} }
} }
@ -89,12 +91,11 @@ func (lt *LockTable[T]) AcquireLock(intention string, key T, lockType LockType)
entry.cond.Broadcast() entry.cond.Broadcast()
} }
} }
entry.activeLockOwnerCount++
// Otherwise, grant the lock // Otherwise, grant the lock
entry.lockType = lockType entry.lockType = lockType
if glog.V(4) { if glog.V(4) {
fmt.Printf("ActiveLock %d %s locked %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeLockOwnerCount)
fmt.Printf("ActiveLock %d %s locked %+v type=%v with waiters %d active r%d w%d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeSharedLockOwnerCount, entry.activeExclusiveLockOwnerCount)
if len(entry.waiters) > 0 { if len(entry.waiters) > 0 {
for _, waiter := range entry.waiters { for _, waiter := range entry.waiters {
fmt.Printf(" %d", waiter.ID) fmt.Printf(" %d", waiter.ID)
@ -102,6 +103,11 @@ func (lt *LockTable[T]) AcquireLock(intention string, key T, lockType LockType)
fmt.Printf("\n") fmt.Printf("\n")
} }
} }
if lock.lockType == ExclusiveLock {
entry.activeExclusiveLockOwnerCount++
} else {
entry.activeSharedLockOwnerCount++
}
entry.mu.Unlock() entry.mu.Unlock()
return lock return lock
@ -129,12 +135,12 @@ func (lt *LockTable[T]) ReleaseLock(key T, lock *ActiveLock) {
} }
// If there are no waiters, release the lock // If there are no waiters, release the lock
if len(entry.waiters) == 0 {
if len(entry.waiters) == 0 && entry.activeExclusiveLockOwnerCount <= 0 && entry.activeSharedLockOwnerCount <= 0 {
delete(lt.locks, key) delete(lt.locks, key)
} }
if glog.V(4) { if glog.V(4) {
fmt.Printf("ActiveLock %d %s unlocked %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, entry.lockType, len(entry.waiters), entry.activeLockOwnerCount)
fmt.Printf("ActiveLock %d %s unlocked %+v type=%v with waiters %d active r%d w%d.\n", lock.ID, lock.intention, key, entry.lockType, len(entry.waiters), entry.activeSharedLockOwnerCount, entry.activeExclusiveLockOwnerCount)
if len(entry.waiters) > 0 { if len(entry.waiters) > 0 {
for _, waiter := range entry.waiters { for _, waiter := range entry.waiters {
fmt.Printf(" %d", waiter.ID) fmt.Printf(" %d", waiter.ID)
@ -142,7 +148,11 @@ func (lt *LockTable[T]) ReleaseLock(key T, lock *ActiveLock) {
fmt.Printf("\n") fmt.Printf("\n")
} }
} }
entry.activeLockOwnerCount--
if lock.lockType == ExclusiveLock {
entry.activeExclusiveLockOwnerCount--
} else {
entry.activeSharedLockOwnerCount--
}
// Notify the next waiter // Notify the next waiter
entry.cond.Broadcast() entry.cond.Broadcast()

Loading…
Cancel
Save