You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

169 lines
4.5 KiB

1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
  1. package util
  2. import (
  3. "fmt"
  4. "sync"
  5. "sync/atomic"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. )
  8. // LockTable is a table of locks that can be acquired.
  9. // Locks are acquired in order of request.
  10. type LockTable[T comparable] struct {
  11. lockIdSeq int64
  12. mu sync.Mutex
  13. locks map[T]*LockEntry
  14. locksInFlight map[T]int
  15. }
  16. type LockEntry struct {
  17. mu sync.Mutex
  18. waiters []*ActiveLock // ordered waiters that are blocked by exclusive locks
  19. activeSharedLockOwnerCount int32
  20. activeExclusiveLockOwnerCount int32
  21. cond *sync.Cond
  22. }
  23. type LockType int
  24. const (
  25. SharedLock LockType = iota
  26. ExclusiveLock
  27. )
  28. type ActiveLock struct {
  29. ID int64
  30. isDeleted bool
  31. intention string // for debugging
  32. lockType LockType
  33. }
  34. func NewLockTable[T comparable]() *LockTable[T] {
  35. return &LockTable[T]{
  36. locks: make(map[T]*LockEntry),
  37. locksInFlight: make(map[T]int),
  38. }
  39. }
  40. func (lt *LockTable[T]) NewActiveLock(intention string, lockType LockType) *ActiveLock {
  41. id := atomic.AddInt64(&lt.lockIdSeq, 1)
  42. l := &ActiveLock{ID: id, intention: intention, lockType: lockType}
  43. return l
  44. }
  45. func (lt *LockTable[T]) AcquireLock(intention string, key T, lockType LockType) (lock *ActiveLock) {
  46. lt.mu.Lock()
  47. // Get or create the lock entry for the key
  48. entry, exists := lt.locks[key]
  49. if !exists {
  50. entry = &LockEntry{}
  51. entry.cond = sync.NewCond(&entry.mu)
  52. lt.locks[key] = entry
  53. lt.locksInFlight[key] = 0
  54. }
  55. lt.locksInFlight[key]++
  56. lt.mu.Unlock()
  57. lock = lt.NewActiveLock(intention, lockType)
  58. // If the lock is held exclusively, wait
  59. entry.mu.Lock()
  60. if len(entry.waiters) > 0 || lockType == ExclusiveLock || entry.activeExclusiveLockOwnerCount > 0 {
  61. if glog.V(4) {
  62. fmt.Printf("ActiveLock %d %s wait for %+v type=%v with waiters %d active r%d w%d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeSharedLockOwnerCount, entry.activeExclusiveLockOwnerCount)
  63. if len(entry.waiters) > 0 {
  64. for _, waiter := range entry.waiters {
  65. fmt.Printf(" %d", waiter.ID)
  66. }
  67. fmt.Printf("\n")
  68. }
  69. }
  70. entry.waiters = append(entry.waiters, lock)
  71. if lockType == ExclusiveLock {
  72. for !lock.isDeleted && ((len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) || entry.activeExclusiveLockOwnerCount > 0 || entry.activeSharedLockOwnerCount > 0) {
  73. entry.cond.Wait()
  74. }
  75. } else {
  76. for !lock.isDeleted && (len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) || entry.activeExclusiveLockOwnerCount > 0 {
  77. entry.cond.Wait()
  78. }
  79. }
  80. // Remove the transaction from the waiters list
  81. if len(entry.waiters) > 0 && lock.ID == entry.waiters[0].ID {
  82. entry.waiters = entry.waiters[1:]
  83. entry.cond.Broadcast()
  84. }
  85. }
  86. // Otherwise, grant the lock
  87. if glog.V(4) {
  88. fmt.Printf("ActiveLock %d %s locked %+v type=%v with waiters %d active r%d w%d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeSharedLockOwnerCount, entry.activeExclusiveLockOwnerCount)
  89. if len(entry.waiters) > 0 {
  90. for _, waiter := range entry.waiters {
  91. fmt.Printf(" %d", waiter.ID)
  92. }
  93. fmt.Printf("\n")
  94. }
  95. }
  96. if lock.lockType == ExclusiveLock {
  97. entry.activeExclusiveLockOwnerCount++
  98. } else {
  99. entry.activeSharedLockOwnerCount++
  100. }
  101. entry.mu.Unlock()
  102. return lock
  103. }
  104. func (lt *LockTable[T]) ReleaseLock(key T, lock *ActiveLock) {
  105. lt.mu.Lock()
  106. defer lt.mu.Unlock()
  107. entry, exists := lt.locks[key]
  108. if !exists {
  109. return
  110. }
  111. lt.locksInFlight[key]--
  112. entry.mu.Lock()
  113. defer entry.mu.Unlock()
  114. // Remove the transaction from the waiters list
  115. for i, waiter := range entry.waiters {
  116. if waiter == lock {
  117. waiter.isDeleted = true
  118. entry.waiters = append(entry.waiters[:i], entry.waiters[i+1:]...)
  119. break
  120. }
  121. }
  122. if lock.lockType == ExclusiveLock {
  123. entry.activeExclusiveLockOwnerCount--
  124. } else {
  125. entry.activeSharedLockOwnerCount--
  126. }
  127. // If there are no waiters, release the lock
  128. if len(entry.waiters) == 0 && lt.locksInFlight[key] <= 0 && entry.activeExclusiveLockOwnerCount <= 0 && entry.activeSharedLockOwnerCount <= 0 {
  129. delete(lt.locks, key)
  130. delete(lt.locksInFlight, key)
  131. }
  132. if glog.V(4) {
  133. fmt.Printf("ActiveLock %d %s unlocked %+v type=%v with waiters %d active r%d w%d.\n", lock.ID, lock.intention, key, lock.lockType, len(entry.waiters), entry.activeSharedLockOwnerCount, entry.activeExclusiveLockOwnerCount)
  134. if len(entry.waiters) > 0 {
  135. for _, waiter := range entry.waiters {
  136. fmt.Printf(" %d", waiter.ID)
  137. }
  138. fmt.Printf("\n")
  139. }
  140. }
  141. // Notify the next waiter
  142. entry.cond.Broadcast()
  143. }
  144. func main() {
  145. }