Browse Source

Merge pull request #1 from chrislusf/master

volume: protect against nil needle map
pull/1041/head
xushuxun 6 years ago
committed by GitHub
parent
commit
3d01510b02
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 30
      weed/storage/needle_map_metric.go
  2. 10
      weed/storage/store.go
  3. 56
      weed/storage/volume.go
  4. 5
      weed/storage/volume_backup.go
  5. 4
      weed/storage/volume_read_write.go
  6. 4
      weed/storage/volume_vacuum.go

30
weed/storage/needle_map_metric.go

@ -19,10 +19,16 @@ type mapMetric struct {
} }
func (mm *mapMetric) logDelete(deletedByteCount uint32) { func (mm *mapMetric) logDelete(deletedByteCount uint32) {
if mm == nil {
return
}
mm.LogDeletionCounter(deletedByteCount) mm.LogDeletionCounter(deletedByteCount)
} }
func (mm *mapMetric) logPut(key NeedleId, oldSize uint32, newSize uint32) { func (mm *mapMetric) logPut(key NeedleId, oldSize uint32, newSize uint32) {
if mm == nil {
return
}
mm.MaybeSetMaxFileKey(key) mm.MaybeSetMaxFileKey(key)
mm.LogFileCounter(newSize) mm.LogFileCounter(newSize)
if oldSize > 0 && oldSize != TombstoneFileSize { if oldSize > 0 && oldSize != TombstoneFileSize {
@ -30,32 +36,56 @@ func (mm *mapMetric) logPut(key NeedleId, oldSize uint32, newSize uint32) {
} }
} }
func (mm *mapMetric) LogFileCounter(newSize uint32) { func (mm *mapMetric) LogFileCounter(newSize uint32) {
if mm == nil {
return
}
atomic.AddUint32(&mm.FileCounter, 1) atomic.AddUint32(&mm.FileCounter, 1)
atomic.AddUint64(&mm.FileByteCounter, uint64(newSize)) atomic.AddUint64(&mm.FileByteCounter, uint64(newSize))
} }
func (mm *mapMetric) LogDeletionCounter(oldSize uint32) { func (mm *mapMetric) LogDeletionCounter(oldSize uint32) {
if mm == nil {
return
}
if oldSize > 0 { if oldSize > 0 {
atomic.AddUint32(&mm.DeletionCounter, 1) atomic.AddUint32(&mm.DeletionCounter, 1)
atomic.AddUint64(&mm.DeletionByteCounter, uint64(oldSize)) atomic.AddUint64(&mm.DeletionByteCounter, uint64(oldSize))
} }
} }
func (mm *mapMetric) ContentSize() uint64 { func (mm *mapMetric) ContentSize() uint64 {
if mm == nil {
return 0
}
return atomic.LoadUint64(&mm.FileByteCounter) return atomic.LoadUint64(&mm.FileByteCounter)
} }
func (mm *mapMetric) DeletedSize() uint64 { func (mm *mapMetric) DeletedSize() uint64 {
if mm == nil {
return 0
}
return atomic.LoadUint64(&mm.DeletionByteCounter) return atomic.LoadUint64(&mm.DeletionByteCounter)
} }
func (mm *mapMetric) FileCount() int { func (mm *mapMetric) FileCount() int {
if mm == nil {
return 0
}
return int(atomic.LoadUint32(&mm.FileCounter)) return int(atomic.LoadUint32(&mm.FileCounter))
} }
func (mm *mapMetric) DeletedCount() int { func (mm *mapMetric) DeletedCount() int {
if mm == nil {
return 0
}
return int(atomic.LoadUint32(&mm.DeletionCounter)) return int(atomic.LoadUint32(&mm.DeletionCounter))
} }
func (mm *mapMetric) MaxFileKey() NeedleId { func (mm *mapMetric) MaxFileKey() NeedleId {
if mm == nil {
return 0
}
t := uint64(mm.MaximumFileKey) t := uint64(mm.MaximumFileKey)
return Uint64ToNeedleId(t) return Uint64ToNeedleId(t)
} }
func (mm *mapMetric) MaybeSetMaxFileKey(key NeedleId) { func (mm *mapMetric) MaybeSetMaxFileKey(key NeedleId) {
if mm == nil {
return
}
if key > mm.MaxFileKey() { if key > mm.MaxFileKey() {
atomic.StoreUint64(&mm.MaximumFileKey, uint64(key)) atomic.StoreUint64(&mm.MaximumFileKey, uint64(key))
} }

10
weed/storage/store.go

@ -137,9 +137,9 @@ func (s *Store) Status() []*VolumeInfo {
Collection: v.Collection, Collection: v.Collection,
ReplicaPlacement: v.ReplicaPlacement, ReplicaPlacement: v.ReplicaPlacement,
Version: v.Version(), Version: v.Version(),
FileCount: v.nm.FileCount(),
DeleteCount: v.nm.DeletedCount(),
DeletedByteCount: v.nm.DeletedSize(),
FileCount: int(v.FileCount()),
DeleteCount: int(v.DeletedCount()),
DeletedByteCount: v.DeletedSize(),
ReadOnly: v.readOnly, ReadOnly: v.readOnly,
Ttl: v.Ttl, Ttl: v.Ttl,
CompactRevision: uint32(v.CompactionRevision), CompactRevision: uint32(v.CompactionRevision),
@ -168,8 +168,8 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
location.Lock() location.Lock()
for _, v := range location.volumes { for _, v := range location.volumes {
if maxFileKey < v.nm.MaxFileKey() {
maxFileKey = v.nm.MaxFileKey()
if maxFileKey < v.MaxFileKey() {
maxFileKey = v.MaxFileKey()
} }
if !v.expired(s.GetVolumeSizeLimit()) { if !v.expired(s.GetVolumeSizeLimit()) {
volumeMessages = append(volumeMessages, v.ToVolumeInformationMessage()) volumeMessages = append(volumeMessages, v.ToVolumeInformationMessage())

56
weed/storage/volume.go

@ -6,6 +6,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"os" "os"
"path" "path"
@ -85,14 +86,54 @@ func (v *Volume) FileStat() (datSize uint64, idxSize uint64, modTime time.Time)
return // -1 causes integer overflow and the volume to become unwritable. return // -1 causes integer overflow and the volume to become unwritable.
} }
func (v *Volume) IndexFileSize() uint64 {
return v.nm.IndexFileSize()
func (v *Volume) ContentSize() uint64 {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
return v.nm.ContentSize()
}
func (v *Volume) DeletedSize() uint64 {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
return v.nm.DeletedSize()
} }
func (v *Volume) FileCount() uint64 { func (v *Volume) FileCount() uint64 {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
return uint64(v.nm.FileCount()) return uint64(v.nm.FileCount())
} }
func (v *Volume) DeletedCount() uint64 {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
return uint64(v.nm.DeletedCount())
}
func (v *Volume) MaxFileKey() types.NeedleId {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
return v.nm.MaxFileKey()
}
func (v *Volume) IndexFileSize() uint64 {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
return v.nm.IndexFileSize()
}
func (v *Volume) IndexFileContent() ([]byte, error) {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
return v.nm.IndexFileContent()
}
func (v *Volume) IndexFileName() string {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
return v.nm.IndexFileName()
}
// Close cleanly shuts down this volume // Close cleanly shuts down this volume
func (v *Volume) Close() { func (v *Volume) Close() {
v.dataFileAccessLock.Lock() v.dataFileAccessLock.Lock()
@ -112,10 +153,6 @@ func (v *Volume) NeedToReplicate() bool {
return v.ReplicaPlacement.GetCopyCount() > 1 return v.ReplicaPlacement.GetCopyCount() > 1
} }
func (v *Volume) ContentSize() uint64 {
return v.nm.ContentSize()
}
// volume is expired if modified time + volume ttl < now // volume is expired if modified time + volume ttl < now
// except when volume is empty // except when volume is empty
// or when the volume does not have a ttl // or when the volume does not have a ttl
@ -158,13 +195,14 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool {
func (v *Volume) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage { func (v *Volume) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage {
size, _, modTime := v.FileStat() size, _, modTime := v.FileStat()
return &master_pb.VolumeInformationMessage{ return &master_pb.VolumeInformationMessage{
Id: uint32(v.Id), Id: uint32(v.Id),
Size: size, Size: size,
Collection: v.Collection, Collection: v.Collection,
FileCount: uint64(v.nm.FileCount()),
DeleteCount: uint64(v.nm.DeletedCount()),
DeletedByteCount: v.nm.DeletedSize(),
FileCount: uint64(v.FileCount()),
DeleteCount: uint64(v.DeletedCount()),
DeletedByteCount: v.DeletedSize(),
ReadOnly: v.readOnly, ReadOnly: v.readOnly,
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
Version: uint32(v.Version()), Version: uint32(v.Version()),

5
weed/storage/volume_backup.go

@ -15,12 +15,15 @@ import (
) )
func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse { func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
var syncStatus = &volume_server_pb.VolumeSyncStatusResponse{} var syncStatus = &volume_server_pb.VolumeSyncStatusResponse{}
if stat, err := v.dataFile.Stat(); err == nil { if stat, err := v.dataFile.Stat(); err == nil {
syncStatus.TailOffset = uint64(stat.Size()) syncStatus.TailOffset = uint64(stat.Size())
} }
syncStatus.Collection = v.Collection syncStatus.Collection = v.Collection
syncStatus.IdxFileSize = v.nm.IndexFileSize()
syncStatus.IdxFileSize = v.IndexFileSize()
syncStatus.CompactRevision = uint32(v.SuperBlock.CompactionRevision) syncStatus.CompactRevision = uint32(v.SuperBlock.CompactionRevision)
syncStatus.Ttl = v.SuperBlock.Ttl.String() syncStatus.Ttl = v.SuperBlock.Ttl.String()
syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String() syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String()

4
weed/storage/volume_read_write.go

@ -21,6 +21,7 @@ func (v *Volume) isFileUnchanged(n *needle.Needle) bool {
if v.Ttl.String() != "" { if v.Ttl.String() != "" {
return false return false
} }
nv, ok := v.nm.Get(n.Id) nv, ok := v.nm.Get(n.Id)
if ok && !nv.Offset.IsZero() && nv.Size != TombstoneFileSize { if ok && !nv.Offset.IsZero() && nv.Size != TombstoneFileSize {
oldNeedle := new(needle.Needle) oldNeedle := new(needle.Needle)
@ -138,6 +139,9 @@ func (v *Volume) deleteNeedle(n *needle.Needle) (uint32, error) {
// read fills in Needle content by looking up n.Id from NeedleMapper // read fills in Needle content by looking up n.Id from NeedleMapper
func (v *Volume) readNeedle(n *needle.Needle) (int, error) { func (v *Volume) readNeedle(n *needle.Needle) (int, error) {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
nv, ok := v.nm.Get(n.Id) nv, ok := v.nm.Get(n.Id)
if !ok || nv.Offset.IsZero() { if !ok || nv.Offset.IsZero() {
return -1, ErrorNotFound return -1, ErrorNotFound

4
weed/storage/volume_vacuum.go

@ -18,7 +18,7 @@ func (v *Volume) garbageLevel() float64 {
if v.ContentSize() == 0 { if v.ContentSize() == 0 {
return 0 return 0
} }
return float64(v.nm.DeletedSize()) / float64(v.ContentSize())
return float64(v.DeletedSize()) / float64(v.ContentSize())
} }
func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error { func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error {
@ -33,7 +33,7 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error
}() }()
filePath := v.FileName() filePath := v.FileName()
v.lastCompactIndexOffset = v.nm.IndexFileSize()
v.lastCompactIndexOffset = v.IndexFileSize()
v.lastCompactRevision = v.SuperBlock.CompactionRevision v.lastCompactRevision = v.SuperBlock.CompactionRevision
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond) return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond)

Loading…
Cancel
Save