You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

163 lines
4.4 KiB

1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
  1. package util
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "sync"
  6. "sync/atomic"
  7. )
  8. // LockTable is a table of locks that can be acquired.
  9. // Locks are acquired in order of request.
  10. type LockTable[T comparable] struct {
  11. mu sync.Mutex
  12. locks map[T]*LockEntry
  13. lockIdSeq int64
  14. }
  15. type LockEntry struct {
  16. mu sync.Mutex
  17. waiters []*ActiveLock // ordered waiters that are blocked by exclusive locks
  18. activeSharedLockOwnerCount int32
  19. activeExclusiveLockOwnerCount int32
  20. lockType LockType
  21. cond *sync.Cond
  22. }
  23. type LockType int
  24. const (
  25. SharedLock LockType = iota
  26. ExclusiveLock
  27. )
  28. type ActiveLock struct {
  29. ID int64
  30. isDeleted bool
  31. intention string // for debugging
  32. lockType LockType
  33. }
  34. func NewLockTable[T comparable]() *LockTable[T] {
  35. return &LockTable[T]{
  36. locks: make(map[T]*LockEntry),
  37. }
  38. }
  39. func (lt *LockTable[T]) NewActiveLock(intention string, lockType LockType) *ActiveLock {
  40. id := atomic.AddInt64(&lt.lockIdSeq, 1)
  41. l := &ActiveLock{ID: id, intention: intention, lockType: lockType}
  42. return l
  43. }
  44. func (lt *LockTable[T]) AcquireLock(intention string, key T, lockType LockType) (lock *ActiveLock) {
  45. lt.mu.Lock()
  46. // Get or create the lock entry for the key
  47. entry, exists := lt.locks[key]
  48. if !exists {
  49. entry = &LockEntry{}
  50. entry.cond = sync.NewCond(&entry.mu)
  51. lt.locks[key] = entry
  52. }
  53. lt.mu.Unlock()
  54. lock = lt.NewActiveLock(intention, lockType)
  55. // If the lock is held exclusively, wait
  56. entry.mu.Lock()
  57. if len(entry.waiters) > 0 || lockType == ExclusiveLock || entry.activeExclusiveLockOwnerCount > 0 {
  58. if glog.V(4) {
  59. fmt.Printf("ActiveLock %d %s wait for %+v type=%v with waiters %d active r%d w%d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeSharedLockOwnerCount, entry.activeExclusiveLockOwnerCount)
  60. if len(entry.waiters) > 0 {
  61. for _, waiter := range entry.waiters {
  62. fmt.Printf(" %d", waiter.ID)
  63. }
  64. fmt.Printf("\n")
  65. }
  66. }
  67. entry.waiters = append(entry.waiters, lock)
  68. if lockType == ExclusiveLock {
  69. for !lock.isDeleted && ((len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) || entry.activeExclusiveLockOwnerCount > 0 || entry.activeSharedLockOwnerCount > 0) {
  70. entry.cond.Wait()
  71. }
  72. } else {
  73. for !lock.isDeleted && (len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) || entry.activeExclusiveLockOwnerCount > 0 {
  74. entry.cond.Wait()
  75. }
  76. }
  77. // Remove the transaction from the waiters list
  78. if len(entry.waiters) > 0 && lock.ID == entry.waiters[0].ID {
  79. entry.waiters = entry.waiters[1:]
  80. entry.cond.Broadcast()
  81. }
  82. }
  83. // Otherwise, grant the lock
  84. entry.lockType = lockType
  85. if glog.V(4) {
  86. fmt.Printf("ActiveLock %d %s locked %+v type=%v with waiters %d active r%d w%d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeSharedLockOwnerCount, entry.activeExclusiveLockOwnerCount)
  87. if len(entry.waiters) > 0 {
  88. for _, waiter := range entry.waiters {
  89. fmt.Printf(" %d", waiter.ID)
  90. }
  91. fmt.Printf("\n")
  92. }
  93. }
  94. if lock.lockType == ExclusiveLock {
  95. entry.activeExclusiveLockOwnerCount++
  96. } else {
  97. entry.activeSharedLockOwnerCount++
  98. }
  99. entry.mu.Unlock()
  100. return lock
  101. }
  102. func (lt *LockTable[T]) ReleaseLock(key T, lock *ActiveLock) {
  103. lt.mu.Lock()
  104. defer lt.mu.Unlock()
  105. entry, exists := lt.locks[key]
  106. if !exists {
  107. return
  108. }
  109. entry.mu.Lock()
  110. defer entry.mu.Unlock()
  111. // Remove the transaction from the waiters list
  112. for i, waiter := range entry.waiters {
  113. if waiter == lock {
  114. waiter.isDeleted = true
  115. entry.waiters = append(entry.waiters[:i], entry.waiters[i+1:]...)
  116. break
  117. }
  118. }
  119. // If there are no waiters, release the lock
  120. if len(entry.waiters) == 0 && entry.activeExclusiveLockOwnerCount <= 0 && entry.activeSharedLockOwnerCount <= 0 {
  121. delete(lt.locks, key)
  122. }
  123. if glog.V(4) {
  124. fmt.Printf("ActiveLock %d %s unlocked %+v type=%v with waiters %d active r%d w%d.\n", lock.ID, lock.intention, key, entry.lockType, len(entry.waiters), entry.activeSharedLockOwnerCount, entry.activeExclusiveLockOwnerCount)
  125. if len(entry.waiters) > 0 {
  126. for _, waiter := range entry.waiters {
  127. fmt.Printf(" %d", waiter.ID)
  128. }
  129. fmt.Printf("\n")
  130. }
  131. }
  132. if lock.lockType == ExclusiveLock {
  133. entry.activeExclusiveLockOwnerCount--
  134. } else {
  135. entry.activeSharedLockOwnerCount--
  136. }
  137. // Notify the next waiter
  138. entry.cond.Broadcast()
  139. }
  140. func main() {
  141. }