From a187f103d1a238f74c88ac14f06d3d1eb50f05ca Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 10 Aug 2025 16:22:36 -0700 Subject: [PATCH] normal volume CompactionRevision --- weed/storage/store.go | 43 +++++++++++++++++++++++----------- weed/storage/volume.go | 3 ++- weed/storage/volume_loading.go | 3 ++- weed/storage/volume_vacuum.go | 3 ++- 4 files changed, 35 insertions(+), 17 deletions(-) diff --git a/weed/storage/store.go b/weed/storage/store.go index 7008c4995..83ddd7dc6 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -260,8 +260,9 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { var volumeMessages []*master_pb.VolumeInformationMessage maxVolumeCounts := make(map[string]uint32) var maxFileKey NeedleId - collectionVolumeSize := make(map[string]int64) - collectionVolumeDeletedBytes := make(map[string]int64) + // Track sizes by collection and compaction revision combination + collectionRevisionVolumeSize := make(map[string]map[uint16]int64) + collectionRevisionVolumeDeletedBytes := make(map[string]map[uint16]int64) collectionVolumeReadOnlyCount := make(map[string]map[string]uint8) for _, location := range s.Locations { var deleteVids []needle.VolumeId @@ -296,17 +297,24 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { } } - if _, exist := collectionVolumeSize[v.Collection]; !exist { - collectionVolumeSize[v.Collection] = 0 - collectionVolumeDeletedBytes[v.Collection] = 0 + // Initialize collection+revision maps if needed + if collectionRevisionVolumeSize[v.Collection] == nil { + collectionRevisionVolumeSize[v.Collection] = make(map[uint16]int64) } + if collectionRevisionVolumeDeletedBytes[v.Collection] == nil { + collectionRevisionVolumeDeletedBytes[v.Collection] = make(map[uint16]int64) + } + if !shouldDeleteVolume { - collectionVolumeSize[v.Collection] += int64(volumeMessage.Size) - collectionVolumeDeletedBytes[v.Collection] += int64(volumeMessage.DeletedByteCount) + collectionRevisionVolumeSize[v.Collection][v.CompactionRevision] += int64(volumeMessage.Size) + collectionRevisionVolumeDeletedBytes[v.Collection][v.CompactionRevision] += int64(volumeMessage.DeletedByteCount) } else { - collectionVolumeSize[v.Collection] -= int64(volumeMessage.Size) - if collectionVolumeSize[v.Collection] <= 0 { - delete(collectionVolumeSize, v.Collection) + collectionRevisionVolumeSize[v.Collection][v.CompactionRevision] -= int64(volumeMessage.Size) + if collectionRevisionVolumeSize[v.Collection][v.CompactionRevision] <= 0 { + delete(collectionRevisionVolumeSize[v.Collection], v.CompactionRevision) + if len(collectionRevisionVolumeSize[v.Collection]) == 0 { + delete(collectionRevisionVolumeSize, v.Collection) + } } } @@ -358,12 +366,19 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { uuidList = append(uuidList, loc.DirectoryUuid) } - for col, size := range collectionVolumeSize { - stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal", "0").Set(float64(size)) + // Update metrics with compaction revision labels + for col, revisionSizes := range collectionRevisionVolumeSize { + for compactionRevision, size := range revisionSizes { + compactionRevisionLabel := fmt.Sprintf("%d", compactionRevision) + stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal", compactionRevisionLabel).Set(float64(size)) + } } - for col, deletedBytes := range collectionVolumeDeletedBytes { - stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "deleted_bytes", "0").Set(float64(deletedBytes)) + for col, revisionDeletedBytes := range collectionRevisionVolumeDeletedBytes { + for compactionRevision, deletedBytes := range revisionDeletedBytes { + compactionRevisionLabel := fmt.Sprintf("%d", compactionRevision) + stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "deleted_bytes", compactionRevisionLabel).Set(float64(deletedBytes)) + } } for col, types := range collectionVolumeReadOnlyCount { diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 1253bafc2..7143cffbe 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -248,7 +248,8 @@ func (v *Volume) doClose() { glog.Warningf("Volume Close fail to sync volume %d", v.Id) } v.DataBackend = nil - stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume", "0").Dec() + compactionRevisionLabel := fmt.Sprintf("%d", v.CompactionRevision) + stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume", compactionRevisionLabel).Dec() } } diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index 88fbab1f0..8f81724e1 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -216,7 +216,8 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind } } - stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume", "0").Inc() + compactionRevisionLabel := fmt.Sprintf("%d", v.CompactionRevision) + stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume", compactionRevisionLabel).Inc() if err == nil { hasLoadedVolume = true diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index cea9b0dac..e181e4d71 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -136,7 +136,8 @@ func (v *Volume) CommitCompact() error { } } v.DataBackend = nil - stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume", "0").Dec() + compactionRevisionLabel := fmt.Sprintf("%d", v.CompactionRevision) + stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume", compactionRevisionLabel).Dec() var e error if e = v.makeupDiff(v.FileName(".cpd"), v.FileName(".cpx"), v.FileName(".dat"), v.FileName(".idx")); e != nil {