|
|
@ -8,6 +8,7 @@ import ( |
|
|
|
"github.com/joeslay/seaweedfs/weed/glog" |
|
|
|
"github.com/joeslay/seaweedfs/weed/stats" |
|
|
|
idx2 "github.com/joeslay/seaweedfs/weed/storage/idx" |
|
|
|
"github.com/joeslay/seaweedfs/weed/storage/memory_map" |
|
|
|
"github.com/joeslay/seaweedfs/weed/storage/needle" |
|
|
|
"github.com/joeslay/seaweedfs/weed/storage/needle_map" |
|
|
|
. "github.com/joeslay/seaweedfs/weed/storage/types" |
|
|
@ -22,85 +23,100 @@ func (v *Volume) garbageLevel() float64 { |
|
|
|
} |
|
|
|
|
|
|
|
func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error { |
|
|
|
glog.V(3).Infof("Compacting volume %d ...", v.Id) |
|
|
|
//no need to lock for copy on write
|
|
|
|
//v.accessLock.Lock()
|
|
|
|
//defer v.accessLock.Unlock()
|
|
|
|
//glog.V(3).Infof("Got Compaction lock...")
|
|
|
|
v.isCompacting = true |
|
|
|
defer func() { |
|
|
|
v.isCompacting = false |
|
|
|
}() |
|
|
|
|
|
|
|
filePath := v.FileName() |
|
|
|
v.lastCompactIndexOffset = v.IndexFileSize() |
|
|
|
v.lastCompactRevision = v.SuperBlock.CompactionRevision |
|
|
|
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) |
|
|
|
return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond) |
|
|
|
|
|
|
|
_, exists := memory_map.FileMemoryMap[v.dataFile.Name()] |
|
|
|
if !exists { //it makes no sense to compact in memory
|
|
|
|
glog.V(3).Infof("Compacting volume %d ...", v.Id) |
|
|
|
//no need to lock for copy on write
|
|
|
|
//v.accessLock.Lock()
|
|
|
|
//defer v.accessLock.Unlock()
|
|
|
|
//glog.V(3).Infof("Got Compaction lock...")
|
|
|
|
v.isCompacting = true |
|
|
|
defer func() { |
|
|
|
v.isCompacting = false |
|
|
|
}() |
|
|
|
|
|
|
|
filePath := v.FileName() |
|
|
|
v.lastCompactIndexOffset = v.IndexFileSize() |
|
|
|
v.lastCompactRevision = v.SuperBlock.CompactionRevision |
|
|
|
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) |
|
|
|
return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond) |
|
|
|
} else { |
|
|
|
return nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (v *Volume) Compact2() error { |
|
|
|
glog.V(3).Infof("Compact2 volume %d ...", v.Id) |
|
|
|
_, exists := memory_map.FileMemoryMap[v.dataFile.Name()] |
|
|
|
if !exists { //it makes no sense to compact in memory
|
|
|
|
|
|
|
|
v.isCompacting = true |
|
|
|
defer func() { |
|
|
|
v.isCompacting = false |
|
|
|
}() |
|
|
|
glog.V(3).Infof("Compact2 volume %d ...", v.Id) |
|
|
|
|
|
|
|
filePath := v.FileName() |
|
|
|
glog.V(3).Infof("creating copies for volume %d ...", v.Id) |
|
|
|
return v.copyDataBasedOnIndexFile(filePath+".cpd", filePath+".cpx") |
|
|
|
v.isCompacting = true |
|
|
|
defer func() { |
|
|
|
v.isCompacting = false |
|
|
|
}() |
|
|
|
|
|
|
|
filePath := v.FileName() |
|
|
|
glog.V(3).Infof("creating copies for volume %d ...", v.Id) |
|
|
|
return v.copyDataBasedOnIndexFile(filePath+".cpd", filePath+".cpx") |
|
|
|
} else { |
|
|
|
return nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (v *Volume) CommitCompact() error { |
|
|
|
glog.V(0).Infof("Committing volume %d vacuuming...", v.Id) |
|
|
|
|
|
|
|
v.isCompacting = true |
|
|
|
defer func() { |
|
|
|
v.isCompacting = false |
|
|
|
}() |
|
|
|
|
|
|
|
v.dataFileAccessLock.Lock() |
|
|
|
defer v.dataFileAccessLock.Unlock() |
|
|
|
|
|
|
|
glog.V(3).Infof("Got volume %d committing lock...", v.Id) |
|
|
|
v.nm.Close() |
|
|
|
if err := v.dataFile.Close(); err != nil { |
|
|
|
glog.V(0).Infof("fail to close volume %d", v.Id) |
|
|
|
} |
|
|
|
v.dataFile = nil |
|
|
|
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec() |
|
|
|
|
|
|
|
var e error |
|
|
|
if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil { |
|
|
|
glog.V(0).Infof("makeupDiff in CommitCompact volume %d failed %v", v.Id, e) |
|
|
|
e = os.Remove(v.FileName() + ".cpd") |
|
|
|
if e != nil { |
|
|
|
return e |
|
|
|
_, exists := memory_map.FileMemoryMap[v.dataFile.Name()] |
|
|
|
if !exists { //it makes no sense to compact in memory
|
|
|
|
glog.V(0).Infof("Committing volume %d vacuuming...", v.Id) |
|
|
|
|
|
|
|
v.isCompacting = true |
|
|
|
defer func() { |
|
|
|
v.isCompacting = false |
|
|
|
}() |
|
|
|
|
|
|
|
v.dataFileAccessLock.Lock() |
|
|
|
defer v.dataFileAccessLock.Unlock() |
|
|
|
|
|
|
|
glog.V(3).Infof("Got volume %d committing lock...", v.Id) |
|
|
|
v.nm.Close() |
|
|
|
if err := v.dataFile.Close(); err != nil { |
|
|
|
glog.V(0).Infof("fail to close volume %d", v.Id) |
|
|
|
} |
|
|
|
e = os.Remove(v.FileName() + ".cpx") |
|
|
|
if e != nil { |
|
|
|
return e |
|
|
|
} |
|
|
|
} else { |
|
|
|
v.dataFile = nil |
|
|
|
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec() |
|
|
|
|
|
|
|
var e error |
|
|
|
if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil { |
|
|
|
return fmt.Errorf("rename %s: %v", v.FileName()+".cpd", e) |
|
|
|
} |
|
|
|
if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil { |
|
|
|
return fmt.Errorf("rename %s: %v", v.FileName()+".cpx", e) |
|
|
|
if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil { |
|
|
|
glog.V(0).Infof("makeupDiff in CommitCompact volume %d failed %v", v.Id, e) |
|
|
|
e = os.Remove(v.FileName() + ".cpd") |
|
|
|
if e != nil { |
|
|
|
return e |
|
|
|
} |
|
|
|
e = os.Remove(v.FileName() + ".cpx") |
|
|
|
if e != nil { |
|
|
|
return e |
|
|
|
} |
|
|
|
} else { |
|
|
|
var e error |
|
|
|
if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil { |
|
|
|
return fmt.Errorf("rename %s: %v", v.FileName()+".cpd", e) |
|
|
|
} |
|
|
|
if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil { |
|
|
|
return fmt.Errorf("rename %s: %v", v.FileName()+".cpx", e) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
//glog.V(3).Infof("Pretending to be vacuuming...")
|
|
|
|
//time.Sleep(20 * time.Second)
|
|
|
|
//glog.V(3).Infof("Pretending to be vacuuming...")
|
|
|
|
//time.Sleep(20 * time.Second)
|
|
|
|
|
|
|
|
os.RemoveAll(v.FileName() + ".ldb") |
|
|
|
os.RemoveAll(v.FileName() + ".bdb") |
|
|
|
os.RemoveAll(v.FileName() + ".ldb") |
|
|
|
os.RemoveAll(v.FileName() + ".bdb") |
|
|
|
|
|
|
|
glog.V(3).Infof("Loading volume %d commit file...", v.Id) |
|
|
|
if e = v.load(true, false, v.needleMapKind, 0); e != nil { |
|
|
|
return e |
|
|
|
glog.V(3).Infof("Loading volume %d commit file...", v.Id) |
|
|
|
if e = v.load(true, false, v.needleMapKind, 0); e != nil { |
|
|
|
return e |
|
|
|
} |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|