|
@ -1,12 +1,13 @@ |
|
|
package lock_manager |
|
|
package lock_manager |
|
|
|
|
|
|
|
|
import ( |
|
|
import ( |
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb" |
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/util" |
|
|
|
|
|
"sort" |
|
|
"sort" |
|
|
"sync" |
|
|
"sync" |
|
|
"time" |
|
|
"time" |
|
|
|
|
|
|
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb" |
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/util" |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
type LockRingSnapshot struct { |
|
|
type LockRingSnapshot struct { |
|
@ -22,6 +23,7 @@ type LockRing struct { |
|
|
lastCompactTime time.Time |
|
|
lastCompactTime time.Time |
|
|
snapshotInterval time.Duration |
|
|
snapshotInterval time.Duration |
|
|
onTakeSnapshot func(snapshot []pb.ServerAddress) |
|
|
onTakeSnapshot func(snapshot []pb.ServerAddress) |
|
|
|
|
|
cleanupWg sync.WaitGroup |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func NewLockRing(snapshotInterval time.Duration) *LockRing { |
|
|
func NewLockRing(snapshotInterval time.Duration) *LockRing { |
|
@ -87,7 +89,9 @@ func (r *LockRing) SetSnapshot(servers []pb.ServerAddress) { |
|
|
|
|
|
|
|
|
r.addOneSnapshot(servers) |
|
|
r.addOneSnapshot(servers) |
|
|
|
|
|
|
|
|
|
|
|
r.cleanupWg.Add(1) |
|
|
go func() { |
|
|
go func() { |
|
|
|
|
|
defer r.cleanupWg.Done() |
|
|
<-time.After(r.snapshotInterval) |
|
|
<-time.After(r.snapshotInterval) |
|
|
r.compactSnapshots() |
|
|
r.compactSnapshots() |
|
|
}() |
|
|
}() |
|
@ -96,7 +100,9 @@ func (r *LockRing) SetSnapshot(servers []pb.ServerAddress) { |
|
|
func (r *LockRing) takeSnapshotWithDelayedCompaction() { |
|
|
func (r *LockRing) takeSnapshotWithDelayedCompaction() { |
|
|
r.doTakeSnapshot() |
|
|
r.doTakeSnapshot() |
|
|
|
|
|
|
|
|
|
|
|
r.cleanupWg.Add(1) |
|
|
go func() { |
|
|
go func() { |
|
|
|
|
|
defer r.cleanupWg.Done() |
|
|
<-time.After(r.snapshotInterval) |
|
|
<-time.After(r.snapshotInterval) |
|
|
r.compactSnapshots() |
|
|
r.compactSnapshots() |
|
|
}() |
|
|
}() |
|
@ -172,6 +178,12 @@ func (r *LockRing) GetSnapshot() (servers []pb.ServerAddress) { |
|
|
return r.snapshots[0].servers |
|
|
return r.snapshots[0].servers |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// WaitForCleanup waits for all pending cleanup operations to complete
|
|
|
|
|
|
// This is useful for testing to ensure deterministic behavior
|
|
|
|
|
|
func (r *LockRing) WaitForCleanup() { |
|
|
|
|
|
r.cleanupWg.Wait() |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
func hashKeyToServer(key string, servers []pb.ServerAddress) pb.ServerAddress { |
|
|
func hashKeyToServer(key string, servers []pb.ServerAddress) pb.ServerAddress { |
|
|
if len(servers) == 0 { |
|
|
if len(servers) == 0 { |
|
|
return "" |
|
|
return "" |
|
|