You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

153 lines
3.7 KiB

1 year ago
1 year ago
1 year ago
1 year ago
  1. package util
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "sync"
  6. "sync/atomic"
  7. )
  8. // LockTable is a table of locks that can be acquired.
  9. // Locks are acquired in order of request.
  10. type LockTable[T comparable] struct {
  11. mu sync.Mutex
  12. locks map[T]*LockEntry
  13. lockIdSeq int64
  14. }
  15. type LockEntry struct {
  16. mu sync.Mutex
  17. waiters []*ActiveLock // ordered waiters that are blocked by exclusive locks
  18. activeLockOwnerCount int32
  19. lockType LockType
  20. cond *sync.Cond
  21. }
  22. type LockType int
  23. const (
  24. SharedLock LockType = iota
  25. ExclusiveLock
  26. )
  27. type ActiveLock struct {
  28. ID int64
  29. isDeleted bool
  30. intention string // for debugging
  31. }
  32. func NewLockTable[T comparable]() *LockTable[T] {
  33. return &LockTable[T]{
  34. locks: make(map[T]*LockEntry),
  35. }
  36. }
  37. func (lt *LockTable[T]) NewActiveLock(intention string) *ActiveLock {
  38. id := atomic.AddInt64(&lt.lockIdSeq, 1)
  39. l := &ActiveLock{ID: id, intention: intention}
  40. return l
  41. }
  42. func (lt *LockTable[T]) AcquireLock(intention string, key T, lockType LockType) (lock *ActiveLock) {
  43. lt.mu.Lock()
  44. // Get or create the lock entry for the key
  45. entry, exists := lt.locks[key]
  46. if !exists {
  47. entry = &LockEntry{}
  48. entry.cond = sync.NewCond(&entry.mu)
  49. lt.locks[key] = entry
  50. }
  51. lt.mu.Unlock()
  52. lock = lt.NewActiveLock(intention)
  53. // If the lock is held exclusively, wait
  54. entry.mu.Lock()
  55. if len(entry.waiters) > 0 || lockType == ExclusiveLock {
  56. if glog.V(4) {
  57. fmt.Printf("ActiveLock %d %s wait for %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeLockOwnerCount)
  58. if len(entry.waiters) > 0 {
  59. for _, waiter := range entry.waiters {
  60. fmt.Printf(" %d", waiter.ID)
  61. }
  62. fmt.Printf("\n")
  63. }
  64. }
  65. entry.waiters = append(entry.waiters, lock)
  66. if lockType == ExclusiveLock {
  67. for !lock.isDeleted && ((len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) || entry.activeLockOwnerCount > 0) {
  68. entry.cond.Wait()
  69. }
  70. } else {
  71. for !lock.isDeleted && (len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) {
  72. entry.cond.Wait()
  73. }
  74. }
  75. // Remove the transaction from the waiters list
  76. if len(entry.waiters) > 0 && lock.ID == entry.waiters[0].ID {
  77. entry.waiters = entry.waiters[1:]
  78. entry.cond.Broadcast()
  79. }
  80. }
  81. // Otherwise, grant the lock
  82. entry.lockType = lockType
  83. if glog.V(4) {
  84. fmt.Printf("ActiveLock %d %s locked %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeLockOwnerCount)
  85. if len(entry.waiters) > 0 {
  86. for _, waiter := range entry.waiters {
  87. fmt.Printf(" %d", waiter.ID)
  88. }
  89. fmt.Printf("\n")
  90. }
  91. }
  92. entry.activeLockOwnerCount++
  93. entry.mu.Unlock()
  94. return lock
  95. }
  96. func (lt *LockTable[T]) ReleaseLock(key T, lock *ActiveLock) {
  97. lt.mu.Lock()
  98. defer lt.mu.Unlock()
  99. entry, exists := lt.locks[key]
  100. if !exists {
  101. return
  102. }
  103. entry.mu.Lock()
  104. defer entry.mu.Unlock()
  105. // Remove the transaction from the waiters list
  106. for i, waiter := range entry.waiters {
  107. if waiter == lock {
  108. waiter.isDeleted = true
  109. entry.waiters = append(entry.waiters[:i], entry.waiters[i+1:]...)
  110. break
  111. }
  112. }
  113. // If there are no waiters, release the lock
  114. if len(entry.waiters) == 0 {
  115. delete(lt.locks, key)
  116. }
  117. if glog.V(4) {
  118. fmt.Printf("ActiveLock %d %s unlocked %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, entry.lockType, len(entry.waiters), entry.activeLockOwnerCount)
  119. if len(entry.waiters) > 0 {
  120. for _, waiter := range entry.waiters {
  121. fmt.Printf(" %d", waiter.ID)
  122. }
  123. fmt.Printf("\n")
  124. }
  125. }
  126. entry.activeLockOwnerCount--
  127. // Notify the next waiter
  128. entry.cond.Broadcast()
  129. }
  130. func main() {
  131. }