Browse Source

reduce locks

pull/1560/head
Chris Lu 4 years ago
parent
commit
9104cfa744
  1. 12
      weed/storage/store.go
  2. 38
      weed/storage/volume.go

12
weed/storage/store.go

@ -206,11 +206,12 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
location.volumesLock.RLock() location.volumesLock.RLock()
for _, v := range location.volumes { for _, v := range location.volumes {
if maxFileKey < v.MaxFileKey() {
maxFileKey = v.MaxFileKey()
curMaxFileKey, volumeMessage := v.ToVolumeInformationMessage()
if maxFileKey < curMaxFileKey {
maxFileKey = curMaxFileKey
} }
if !v.expired(s.GetVolumeSizeLimit()) {
volumeMessages = append(volumeMessages, v.ToVolumeInformationMessage())
if !v.expired(volumeMessage.Size, s.GetVolumeSizeLimit()) {
volumeMessages = append(volumeMessages, volumeMessage)
} else { } else {
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) { if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
deleteVids = append(deleteVids, v.Id) deleteVids = append(deleteVids, v.Id)
@ -218,8 +219,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
glog.V(0).Infoln("volume", v.Id, "is expired.") glog.V(0).Infoln("volume", v.Id, "is expired.")
} }
} }
fileSize, _, _ := v.FileStat()
collectionVolumeSize[v.Collection] += fileSize
collectionVolumeSize[v.Collection] += volumeMessage.Size
if v.IsReadOnly() { if v.IsReadOnly() {
collectionVolumeReadOnlyCount[v.Collection] += 1 collectionVolumeReadOnlyCount[v.Collection] += 1
} }

38
weed/storage/volume.go

@ -178,12 +178,12 @@ func (v *Volume) NeedToReplicate() bool {
// except when volume is empty // except when volume is empty
// or when the volume does not have a ttl // or when the volume does not have a ttl
// or when volumeSizeLimit is 0 when server just starts // or when volumeSizeLimit is 0 when server just starts
func (v *Volume) expired(volumeSizeLimit uint64) bool {
func (v *Volume) expired(contentSize uint64, volumeSizeLimit uint64) bool {
if volumeSizeLimit == 0 { if volumeSizeLimit == 0 {
// skip if we don't know size limit // skip if we don't know size limit
return false return false
} }
if v.ContentSize() == 0 {
if contentSize <= super_block.SuperBlockSize {
return false return false
} }
if v.Ttl == nil || v.Ttl.Minutes() == 0 { if v.Ttl == nil || v.Ttl.Minutes() == 0 {
@ -214,16 +214,32 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool {
return false return false
} }
func (v *Volume) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage {
size, _, modTime := v.FileStat()
func (v *Volume) CollectStatus() (maxFileKey types.NeedleId, datFileSize int64, modTime time.Time, fileCount, deletedCount, deletedSize uint64) {
v.dataFileAccessLock.RLock()
defer v.dataFileAccessLock.RUnlock()
glog.V(3).Infof("CollectStatus volume %d", v.Id)
maxFileKey = v.nm.MaxFileKey()
datFileSize, modTime, _ = v.DataBackend.GetStat()
fileCount = uint64(v.nm.FileCount())
deletedCount = uint64(v.nm.DeletedCount())
deletedSize = v.nm.DeletedSize()
fileCount = uint64(v.nm.FileCount())
return
}
func (v *Volume) ToVolumeInformationMessage() (types.NeedleId, *master_pb.VolumeInformationMessage) {
maxFileKey, volumeSize, modTime, fileCount, deletedCount, deletedSize := v.CollectStatus()
volumInfo := &master_pb.VolumeInformationMessage{
volumeInfo := &master_pb.VolumeInformationMessage{
Id: uint32(v.Id), Id: uint32(v.Id),
Size: size,
Size: uint64(volumeSize),
Collection: v.Collection, Collection: v.Collection,
FileCount: v.FileCount(),
DeleteCount: v.DeletedCount(),
DeletedByteCount: v.DeletedSize(),
FileCount: fileCount,
DeleteCount: deletedCount,
DeletedByteCount: deletedSize,
ReadOnly: v.IsReadOnly(), ReadOnly: v.IsReadOnly(),
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
Version: uint32(v.Version()), Version: uint32(v.Version()),
@ -232,9 +248,9 @@ func (v *Volume) ToVolumeInformationMessage() *master_pb.VolumeInformationMessag
ModifiedAtSecond: modTime.Unix(), ModifiedAtSecond: modTime.Unix(),
} }
volumInfo.RemoteStorageName, volumInfo.RemoteStorageKey = v.RemoteStorageNameKey()
volumeInfo.RemoteStorageName, volumeInfo.RemoteStorageKey = v.RemoteStorageNameKey()
return volumInfo
return maxFileKey, volumeInfo
} }
func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string) { func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string) {

Loading…
Cancel
Save