You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

176 lines
3.6 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. package lock_manager
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/pb"
  4. "github.com/seaweedfs/seaweedfs/weed/util"
  5. "sort"
  6. "sync"
  7. "time"
  8. )
  9. type LockRingSnapshot struct {
  10. servers []pb.ServerAddress
  11. ts time.Time
  12. }
  13. type LockRing struct {
  14. sync.RWMutex
  15. snapshots []*LockRingSnapshot
  16. candidateServers map[pb.ServerAddress]struct{}
  17. lastUpdateTime time.Time
  18. lastCompactTime time.Time
  19. snapshotInterval time.Duration
  20. onTakeSnapshot func(snapshot []pb.ServerAddress)
  21. }
  22. func NewLockRing(snapshotInterval time.Duration) *LockRing {
  23. return &LockRing{
  24. snapshotInterval: snapshotInterval,
  25. candidateServers: make(map[pb.ServerAddress]struct{}),
  26. }
  27. }
  28. func (r *LockRing) SetTakeSnapshotCallback(onTakeSnapshot func(snapshot []pb.ServerAddress)) {
  29. r.Lock()
  30. defer r.Unlock()
  31. r.onTakeSnapshot = onTakeSnapshot
  32. }
  33. // AddServer adds a server to the ring
  34. // if the previous snapshot passed the snapshot interval, create a new snapshot
  35. func (r *LockRing) AddServer(server pb.ServerAddress) {
  36. r.Lock()
  37. if _, found := r.candidateServers[server]; found {
  38. r.Unlock()
  39. return
  40. }
  41. r.lastUpdateTime = time.Now()
  42. r.candidateServers[server] = struct{}{}
  43. r.Unlock()
  44. r.takeSnapshotWithDelayedCompaction()
  45. }
  46. func (r *LockRing) RemoveServer(server pb.ServerAddress) {
  47. r.Lock()
  48. if _, found := r.candidateServers[server]; !found {
  49. r.Unlock()
  50. return
  51. }
  52. r.lastUpdateTime = time.Now()
  53. delete(r.candidateServers, server)
  54. r.Unlock()
  55. r.takeSnapshotWithDelayedCompaction()
  56. }
  57. func (r *LockRing) SetSnapshot(servers []pb.ServerAddress) {
  58. sort.Slice(servers, func(i, j int) bool {
  59. return servers[i] < servers[j]
  60. })
  61. r.Lock()
  62. r.lastUpdateTime = time.Now()
  63. r.Unlock()
  64. r.addOneSnapshot(servers)
  65. go func() {
  66. <-time.After(r.snapshotInterval)
  67. r.compactSnapshots()
  68. }()
  69. }
  70. func (r *LockRing) takeSnapshotWithDelayedCompaction() {
  71. r.doTakeSnapshot()
  72. go func() {
  73. <-time.After(r.snapshotInterval)
  74. r.compactSnapshots()
  75. }()
  76. }
  77. func (r *LockRing) doTakeSnapshot() {
  78. servers := r.getSortedServers()
  79. r.addOneSnapshot(servers)
  80. }
  81. func (r *LockRing) addOneSnapshot(servers []pb.ServerAddress) {
  82. r.Lock()
  83. defer r.Unlock()
  84. ts := time.Now()
  85. t := &LockRingSnapshot{
  86. servers: servers,
  87. ts: ts,
  88. }
  89. r.snapshots = append(r.snapshots, t)
  90. for i := len(r.snapshots) - 2; i >= 0; i-- {
  91. r.snapshots[i+1] = r.snapshots[i]
  92. }
  93. r.snapshots[0] = t
  94. if r.onTakeSnapshot != nil {
  95. r.onTakeSnapshot(t.servers)
  96. }
  97. }
  98. func (r *LockRing) compactSnapshots() {
  99. r.Lock()
  100. defer r.Unlock()
  101. if r.lastCompactTime.After(r.lastUpdateTime) {
  102. return
  103. }
  104. ts := time.Now()
  105. // remove old snapshots
  106. recentSnapshotIndex := 1
  107. for ; recentSnapshotIndex < len(r.snapshots); recentSnapshotIndex++ {
  108. if ts.Sub(r.snapshots[recentSnapshotIndex].ts) > r.snapshotInterval {
  109. break
  110. }
  111. }
  112. // keep the one that has been running for a while
  113. if recentSnapshotIndex+1 <= len(r.snapshots) {
  114. r.snapshots = r.snapshots[:recentSnapshotIndex+1]
  115. }
  116. r.lastCompactTime = ts
  117. }
  118. func (r *LockRing) getSortedServers() []pb.ServerAddress {
  119. sortedServers := make([]pb.ServerAddress, 0, len(r.candidateServers))
  120. for server := range r.candidateServers {
  121. sortedServers = append(sortedServers, server)
  122. }
  123. sort.Slice(sortedServers, func(i, j int) bool {
  124. return sortedServers[i] < sortedServers[j]
  125. })
  126. return sortedServers
  127. }
  128. func (r *LockRing) GetSnapshot() (servers []pb.ServerAddress) {
  129. r.RLock()
  130. defer r.RUnlock()
  131. if len(r.snapshots) == 0 {
  132. return
  133. }
  134. return r.snapshots[0].servers
  135. }
  136. func hashKeyToServer(key string, servers []pb.ServerAddress) pb.ServerAddress {
  137. if len(servers) == 0 {
  138. return ""
  139. }
  140. x := util.HashStringToLong(key)
  141. if x < 0 {
  142. x = -x
  143. }
  144. x = x % int64(len(servers))
  145. return servers[x]
  146. }