Browse Source

normal volume CompactionRevision

add-ec-vacuum
chrislu 5 months ago
parent
commit
a187f103d1
  1. 43
      weed/storage/store.go
  2. 3
      weed/storage/volume.go
  3. 3
      weed/storage/volume_loading.go
  4. 3
      weed/storage/volume_vacuum.go

43
weed/storage/store.go

@ -260,8 +260,9 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
var volumeMessages []*master_pb.VolumeInformationMessage var volumeMessages []*master_pb.VolumeInformationMessage
maxVolumeCounts := make(map[string]uint32) maxVolumeCounts := make(map[string]uint32)
var maxFileKey NeedleId var maxFileKey NeedleId
collectionVolumeSize := make(map[string]int64)
collectionVolumeDeletedBytes := make(map[string]int64)
// Track sizes by collection and compaction revision combination
collectionRevisionVolumeSize := make(map[string]map[uint16]int64)
collectionRevisionVolumeDeletedBytes := make(map[string]map[uint16]int64)
collectionVolumeReadOnlyCount := make(map[string]map[string]uint8) collectionVolumeReadOnlyCount := make(map[string]map[string]uint8)
for _, location := range s.Locations { for _, location := range s.Locations {
var deleteVids []needle.VolumeId var deleteVids []needle.VolumeId
@ -296,17 +297,24 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
} }
} }
if _, exist := collectionVolumeSize[v.Collection]; !exist {
collectionVolumeSize[v.Collection] = 0
collectionVolumeDeletedBytes[v.Collection] = 0
// Initialize collection+revision maps if needed
if collectionRevisionVolumeSize[v.Collection] == nil {
collectionRevisionVolumeSize[v.Collection] = make(map[uint16]int64)
} }
if collectionRevisionVolumeDeletedBytes[v.Collection] == nil {
collectionRevisionVolumeDeletedBytes[v.Collection] = make(map[uint16]int64)
}
if !shouldDeleteVolume { if !shouldDeleteVolume {
collectionVolumeSize[v.Collection] += int64(volumeMessage.Size)
collectionVolumeDeletedBytes[v.Collection] += int64(volumeMessage.DeletedByteCount)
collectionRevisionVolumeSize[v.Collection][v.CompactionRevision] += int64(volumeMessage.Size)
collectionRevisionVolumeDeletedBytes[v.Collection][v.CompactionRevision] += int64(volumeMessage.DeletedByteCount)
} else { } else {
collectionVolumeSize[v.Collection] -= int64(volumeMessage.Size)
if collectionVolumeSize[v.Collection] <= 0 {
delete(collectionVolumeSize, v.Collection)
collectionRevisionVolumeSize[v.Collection][v.CompactionRevision] -= int64(volumeMessage.Size)
if collectionRevisionVolumeSize[v.Collection][v.CompactionRevision] <= 0 {
delete(collectionRevisionVolumeSize[v.Collection], v.CompactionRevision)
if len(collectionRevisionVolumeSize[v.Collection]) == 0 {
delete(collectionRevisionVolumeSize, v.Collection)
}
} }
} }
@ -358,12 +366,19 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
uuidList = append(uuidList, loc.DirectoryUuid) uuidList = append(uuidList, loc.DirectoryUuid)
} }
for col, size := range collectionVolumeSize {
stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal", "0").Set(float64(size))
// Update metrics with compaction revision labels
for col, revisionSizes := range collectionRevisionVolumeSize {
for compactionRevision, size := range revisionSizes {
compactionRevisionLabel := fmt.Sprintf("%d", compactionRevision)
stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal", compactionRevisionLabel).Set(float64(size))
}
} }
for col, deletedBytes := range collectionVolumeDeletedBytes {
stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "deleted_bytes", "0").Set(float64(deletedBytes))
for col, revisionDeletedBytes := range collectionRevisionVolumeDeletedBytes {
for compactionRevision, deletedBytes := range revisionDeletedBytes {
compactionRevisionLabel := fmt.Sprintf("%d", compactionRevision)
stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "deleted_bytes", compactionRevisionLabel).Set(float64(deletedBytes))
}
} }
for col, types := range collectionVolumeReadOnlyCount { for col, types := range collectionVolumeReadOnlyCount {

3
weed/storage/volume.go

@ -248,7 +248,8 @@ func (v *Volume) doClose() {
glog.Warningf("Volume Close fail to sync volume %d", v.Id) glog.Warningf("Volume Close fail to sync volume %d", v.Id)
} }
v.DataBackend = nil v.DataBackend = nil
stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume", "0").Dec()
compactionRevisionLabel := fmt.Sprintf("%d", v.CompactionRevision)
stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume", compactionRevisionLabel).Dec()
} }
} }

3
weed/storage/volume_loading.go

@ -216,7 +216,8 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
} }
} }
stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume", "0").Inc()
compactionRevisionLabel := fmt.Sprintf("%d", v.CompactionRevision)
stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume", compactionRevisionLabel).Inc()
if err == nil { if err == nil {
hasLoadedVolume = true hasLoadedVolume = true

3
weed/storage/volume_vacuum.go

@ -136,7 +136,8 @@ func (v *Volume) CommitCompact() error {
} }
} }
v.DataBackend = nil v.DataBackend = nil
stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume", "0").Dec()
compactionRevisionLabel := fmt.Sprintf("%d", v.CompactionRevision)
stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume", compactionRevisionLabel).Dec()
var e error var e error
if e = v.makeupDiff(v.FileName(".cpd"), v.FileName(".cpx"), v.FileName(".dat"), v.FileName(".idx")); e != nil { if e = v.makeupDiff(v.FileName(".cpd"), v.FileName(".cpx"), v.FileName(".dat"), v.FileName(".idx")); e != nil {

Loading…
Cancel
Save