Browse Source

master: fix negative active volumes (#7440)

* fix negative active volumes

* address comments

* simplify
master
Chris Lu 23 hours ago
committed by GitHub
parent
commit
ecdbe572ca
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 13
      weed/topology/disk.go
  2. 114
      weed/topology/topology_test.go

13
weed/topology/disk.go

@ -176,6 +176,19 @@ func (d *Disk) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChanged bool)
d.UpAdjustDiskUsageDelta(types.ToDiskType(v.DiskType), deltaDiskUsage)
}
isChanged = d.volumes[v.Id].ReadOnly != v.ReadOnly
if isChanged {
// Adjust active volume count when ReadOnly status changes
// Use a separate delta object to avoid affecting other metric adjustments
readOnlyDelta := &DiskUsageCounts{}
if v.ReadOnly {
// Changed from writable to read-only
readOnlyDelta.activeVolumeCount = -1
} else {
// Changed from read-only to writable
readOnlyDelta.activeVolumeCount = 1
}
d.UpAdjustDiskUsageDelta(types.ToDiskType(v.DiskType), readOnlyDelta)
}
d.volumes[v.Id] = v
}
return

114
weed/topology/topology_test.go

@ -211,6 +211,120 @@ func TestAddRemoveVolume(t *testing.T) {
}
}
func TestVolumeReadOnlyStatusChange(t *testing.T) {
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1")
maxVolumeCounts := make(map[string]uint32)
maxVolumeCounts[""] = 25
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts)
// Create a writable volume
v := storage.VolumeInfo{
Id: needle.VolumeId(1),
Size: 100,
Collection: "",
DiskType: "",
FileCount: 10,
DeleteCount: 0,
DeletedByteCount: 0,
ReadOnly: false, // Initially writable
Version: needle.GetCurrentVersion(),
ReplicaPlacement: &super_block.ReplicaPlacement{},
Ttl: needle.EMPTY_TTL,
}
dn.UpdateVolumes([]storage.VolumeInfo{v})
topo.RegisterVolumeLayout(v, dn)
// Check initial active count (should be 1 since volume is writable)
usageCounts := topo.diskUsages.usages[types.HardDriveType]
assert(t, "initial activeVolumeCount", int(usageCounts.activeVolumeCount), 1)
assert(t, "initial remoteVolumeCount", int(usageCounts.remoteVolumeCount), 0)
// Change volume to read-only
v.ReadOnly = true
dn.UpdateVolumes([]storage.VolumeInfo{v})
// Check active count after marking read-only (should be 0)
usageCounts = topo.diskUsages.usages[types.HardDriveType]
assert(t, "activeVolumeCount after read-only", int(usageCounts.activeVolumeCount), 0)
// Change volume back to writable
v.ReadOnly = false
dn.UpdateVolumes([]storage.VolumeInfo{v})
// Check active count after marking writable again (should be 1)
usageCounts = topo.diskUsages.usages[types.HardDriveType]
assert(t, "activeVolumeCount after writable again", int(usageCounts.activeVolumeCount), 1)
}
func TestVolumeReadOnlyAndRemoteStatusChange(t *testing.T) {
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1")
maxVolumeCounts := make(map[string]uint32)
maxVolumeCounts[""] = 25
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts)
// Create a writable, local volume
v := storage.VolumeInfo{
Id: needle.VolumeId(1),
Size: 100,
Collection: "",
DiskType: "",
FileCount: 10,
DeleteCount: 0,
DeletedByteCount: 0,
ReadOnly: false, // Initially writable
RemoteStorageName: "", // Initially local
Version: needle.GetCurrentVersion(),
ReplicaPlacement: &super_block.ReplicaPlacement{},
Ttl: needle.EMPTY_TTL,
}
dn.UpdateVolumes([]storage.VolumeInfo{v})
topo.RegisterVolumeLayout(v, dn)
// Check initial counts
usageCounts := topo.diskUsages.usages[types.HardDriveType]
assert(t, "initial activeVolumeCount", int(usageCounts.activeVolumeCount), 1)
assert(t, "initial remoteVolumeCount", int(usageCounts.remoteVolumeCount), 0)
// Simultaneously change to read-only AND remote
v.ReadOnly = true
v.RemoteStorageName = "s3"
v.RemoteStorageKey = "key1"
dn.UpdateVolumes([]storage.VolumeInfo{v})
// Check counts after both changes
usageCounts = topo.diskUsages.usages[types.HardDriveType]
assert(t, "activeVolumeCount after read-only+remote", int(usageCounts.activeVolumeCount), 0)
assert(t, "remoteVolumeCount after read-only+remote", int(usageCounts.remoteVolumeCount), 1)
// Change back to writable but keep remote
v.ReadOnly = false
dn.UpdateVolumes([]storage.VolumeInfo{v})
// Check counts - should be writable (active=1) and still remote
usageCounts = topo.diskUsages.usages[types.HardDriveType]
assert(t, "activeVolumeCount after writable+remote", int(usageCounts.activeVolumeCount), 1)
assert(t, "remoteVolumeCount after writable+remote", int(usageCounts.remoteVolumeCount), 1)
// Change back to local AND read-only simultaneously
v.ReadOnly = true
v.RemoteStorageName = ""
v.RemoteStorageKey = ""
dn.UpdateVolumes([]storage.VolumeInfo{v})
// Check final counts
usageCounts = topo.diskUsages.usages[types.HardDriveType]
assert(t, "final activeVolumeCount", int(usageCounts.activeVolumeCount), 0)
assert(t, "final remoteVolumeCount", int(usageCounts.remoteVolumeCount), 0)
}
func TestListCollections(t *testing.T) {
rp, _ := super_block.NewReplicaPlacementFromString("002")

Loading…
Cancel
Save