|
|
@ -57,7 +57,7 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error |
|
|
|
} |
|
|
|
|
|
|
|
// compact a volume based on deletions in .idx files
|
|
|
|
func (v *Volume) Compact2(preallocate int64) error { |
|
|
|
func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64) error { |
|
|
|
|
|
|
|
if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory
|
|
|
|
return nil |
|
|
@ -73,7 +73,7 @@ func (v *Volume) Compact2(preallocate int64) error { |
|
|
|
v.lastCompactIndexOffset = v.IndexFileSize() |
|
|
|
v.lastCompactRevision = v.SuperBlock.CompactionRevision |
|
|
|
glog.V(3).Infof("creating copies for volume %d ...", v.Id) |
|
|
|
return copyDataBasedOnIndexFile(filePath+".dat", filePath+".idx", filePath+".cpd", filePath+".cpx", v.SuperBlock, v.Version(), preallocate) |
|
|
|
return copyDataBasedOnIndexFile(filePath+".dat", filePath+".idx", filePath+".cpd", filePath+".cpx", v.SuperBlock, v.Version(), preallocate, compactionBytePerSecond) |
|
|
|
} |
|
|
|
|
|
|
|
func (v *Volume) CommitCompact() error { |
|
|
@ -366,7 +366,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate int64) (err error) { |
|
|
|
func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate int64, compactionBytePerSecond int64) (err error) { |
|
|
|
var ( |
|
|
|
srcDatBackend, dstDatBackend backend.BackendStorageFile |
|
|
|
dataFile *os.File |
|
|
@ -395,6 +395,8 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str |
|
|
|
dstDatBackend.WriteAt(sb.Bytes(), 0) |
|
|
|
newOffset := int64(sb.BlockSize()) |
|
|
|
|
|
|
|
writeThrottler := util.NewWriteThrottler(compactionBytePerSecond) |
|
|
|
|
|
|
|
oldNm.AscendingVisit(func(value needle_map.NeedleValue) error { |
|
|
|
|
|
|
|
offset, size := value.Offset, value.Size |
|
|
@ -419,7 +421,9 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str |
|
|
|
if _, _, _, err = n.Append(dstDatBackend, sb.Version); err != nil { |
|
|
|
return fmt.Errorf("cannot append needle: %s", err) |
|
|
|
} |
|
|
|
newOffset += n.DiskSize(version) |
|
|
|
delta := n.DiskSize(version) |
|
|
|
newOffset += delta |
|
|
|
writeThrottler.MaybeSlowdown(delta) |
|
|
|
glog.V(4).Infoln("saving key", n.Id, "volume offset", offset, "=>", newOffset, "data_size", n.Size) |
|
|
|
|
|
|
|
return nil |
|
|
|