You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

147 lines
3.6 KiB

1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
  1. package util
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "sync"
  6. "sync/atomic"
  7. )
  8. // LockTable is a table of locks that can be acquired.
  9. // Locks are acquired in order of request.
  10. type LockTable[T comparable] struct {
  11. mu sync.Mutex
  12. locks map[T]*LockEntry
  13. lockIdSeq int64
  14. }
  15. type LockEntry struct {
  16. mu sync.Mutex
  17. waiters []*ActiveLock // ordered waiters that are blocked by exclusive locks
  18. activeLockOwnerCount int32
  19. lockType LockType
  20. cond *sync.Cond
  21. }
  22. type LockType int
  23. const (
  24. SharedLock LockType = iota
  25. ExclusiveLock
  26. )
  27. type ActiveLock struct {
  28. ID int64
  29. isDeleted bool
  30. intention string // for debugging
  31. }
  32. func NewLockTable[T comparable]() *LockTable[T] {
  33. return &LockTable[T]{
  34. locks: make(map[T]*LockEntry),
  35. }
  36. }
  37. func (lt *LockTable[T]) NewActiveLock(intention string) *ActiveLock {
  38. id := atomic.AddInt64(&lt.lockIdSeq, 1)
  39. l := &ActiveLock{ID: id, intention: intention}
  40. return l
  41. }
  42. func (lt *LockTable[T]) AcquireLock(intention string, key T, lockType LockType) (lock *ActiveLock) {
  43. lt.mu.Lock()
  44. // Get or create the lock entry for the key
  45. entry, exists := lt.locks[key]
  46. if !exists {
  47. entry = &LockEntry{}
  48. entry.cond = sync.NewCond(&entry.mu)
  49. lt.locks[key] = entry
  50. }
  51. lt.mu.Unlock()
  52. lock = lt.NewActiveLock(intention)
  53. // If the lock is held exclusively, wait
  54. entry.mu.Lock()
  55. if len(entry.waiters) > 0 || lockType == ExclusiveLock {
  56. glog.V(4).Infof("ActiveLock %d %s wait for %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeLockOwnerCount)
  57. if glog.V(4) && len(entry.waiters) > 0 {
  58. for _, waiter := range entry.waiters {
  59. fmt.Printf(" %d", waiter.ID)
  60. }
  61. fmt.Printf("\n")
  62. }
  63. entry.waiters = append(entry.waiters, lock)
  64. if lockType == ExclusiveLock {
  65. for !lock.isDeleted && ((len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) || entry.activeLockOwnerCount > 0) {
  66. entry.cond.Wait()
  67. }
  68. } else {
  69. for !lock.isDeleted && (len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) {
  70. entry.cond.Wait()
  71. }
  72. }
  73. // Remove the transaction from the waiters list
  74. if len(entry.waiters) > 0 && lock.ID == entry.waiters[0].ID {
  75. entry.waiters = entry.waiters[1:]
  76. entry.cond.Broadcast()
  77. }
  78. }
  79. entry.activeLockOwnerCount++
  80. // Otherwise, grant the lock
  81. entry.lockType = lockType
  82. glog.V(4).Infof("ActiveLock %d %s locked %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeLockOwnerCount)
  83. if glog.V(4) && len(entry.waiters) > 0 {
  84. for _, waiter := range entry.waiters {
  85. fmt.Printf(" %d", waiter.ID)
  86. }
  87. fmt.Printf("\n")
  88. }
  89. entry.mu.Unlock()
  90. return lock
  91. }
  92. func (lt *LockTable[T]) ReleaseLock(key T, lock *ActiveLock) {
  93. lt.mu.Lock()
  94. defer lt.mu.Unlock()
  95. entry, exists := lt.locks[key]
  96. if !exists {
  97. return
  98. }
  99. entry.mu.Lock()
  100. defer entry.mu.Unlock()
  101. // Remove the transaction from the waiters list
  102. for i, waiter := range entry.waiters {
  103. if waiter == lock {
  104. waiter.isDeleted = true
  105. entry.waiters = append(entry.waiters[:i], entry.waiters[i+1:]...)
  106. break
  107. }
  108. }
  109. // If there are no waiters, release the lock
  110. if len(entry.waiters) == 0 {
  111. delete(lt.locks, key)
  112. }
  113. glog.V(4).Infof("ActiveLock %d %s unlocked %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, entry.lockType, len(entry.waiters), entry.activeLockOwnerCount)
  114. if len(entry.waiters) > 0 {
  115. for _, waiter := range entry.waiters {
  116. fmt.Printf(" %d", waiter.ID)
  117. }
  118. fmt.Printf("\n")
  119. }
  120. entry.activeLockOwnerCount--
  121. // Notify the next waiter
  122. entry.cond.Broadcast()
  123. }
  124. func main() {
  125. }