diff --git a/weed/storage/store.go b/weed/storage/store.go index 72a388379..2cd237fcb 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -206,11 +206,12 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { maxVolumeCount = maxVolumeCount + location.MaxVolumeCount location.volumesLock.RLock() for _, v := range location.volumes { - if maxFileKey < v.MaxFileKey() { - maxFileKey = v.MaxFileKey() + curMaxFileKey, volumeMessage := v.ToVolumeInformationMessage() + if maxFileKey < curMaxFileKey { + maxFileKey = curMaxFileKey } - if !v.expired(s.GetVolumeSizeLimit()) { - volumeMessages = append(volumeMessages, v.ToVolumeInformationMessage()) + if !v.expired(volumeMessage.Size, s.GetVolumeSizeLimit()) { + volumeMessages = append(volumeMessages, volumeMessage) } else { if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) { deleteVids = append(deleteVids, v.Id) @@ -218,8 +219,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { glog.V(0).Infoln("volume", v.Id, "is expired.") } } - fileSize, _, _ := v.FileStat() - collectionVolumeSize[v.Collection] += fileSize + collectionVolumeSize[v.Collection] += volumeMessage.Size if v.IsReadOnly() { collectionVolumeReadOnlyCount[v.Collection] += 1 } diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 2d46fbcdf..a7a963a59 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -178,12 +178,12 @@ func (v *Volume) NeedToReplicate() bool { // except when volume is empty // or when the volume does not have a ttl // or when volumeSizeLimit is 0 when server just starts -func (v *Volume) expired(volumeSizeLimit uint64) bool { +func (v *Volume) expired(contentSize uint64, volumeSizeLimit uint64) bool { if volumeSizeLimit == 0 { // skip if we don't know size limit return false } - if v.ContentSize() == 0 { + if contentSize <= super_block.SuperBlockSize { return false } if v.Ttl == nil || v.Ttl.Minutes() == 0 { @@ -214,16 +214,32 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool { return false } -func (v *Volume) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage { - size, _, modTime := v.FileStat() +func (v *Volume) CollectStatus() (maxFileKey types.NeedleId, datFileSize int64, modTime time.Time, fileCount, deletedCount, deletedSize uint64) { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + glog.V(3).Infof("CollectStatus volume %d", v.Id) + + maxFileKey = v.nm.MaxFileKey() + datFileSize, modTime, _ = v.DataBackend.GetStat() + fileCount = uint64(v.nm.FileCount()) + deletedCount = uint64(v.nm.DeletedCount()) + deletedSize = v.nm.DeletedSize() + fileCount = uint64(v.nm.FileCount()) + + return +} + +func (v *Volume) ToVolumeInformationMessage() (types.NeedleId, *master_pb.VolumeInformationMessage) { + + maxFileKey, volumeSize, modTime, fileCount, deletedCount, deletedSize := v.CollectStatus() - volumInfo := &master_pb.VolumeInformationMessage{ + volumeInfo := &master_pb.VolumeInformationMessage{ Id: uint32(v.Id), - Size: size, + Size: uint64(volumeSize), Collection: v.Collection, - FileCount: v.FileCount(), - DeleteCount: v.DeletedCount(), - DeletedByteCount: v.DeletedSize(), + FileCount: fileCount, + DeleteCount: deletedCount, + DeletedByteCount: deletedSize, ReadOnly: v.IsReadOnly(), ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), Version: uint32(v.Version()), @@ -232,9 +248,9 @@ func (v *Volume) ToVolumeInformationMessage() *master_pb.VolumeInformationMessag ModifiedAtSecond: modTime.Unix(), } - volumInfo.RemoteStorageName, volumInfo.RemoteStorageKey = v.RemoteStorageNameKey() + volumeInfo.RemoteStorageName, volumeInfo.RemoteStorageKey = v.RemoteStorageNameKey() - return volumInfo + return maxFileKey, volumeInfo } func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string) {