|
|
@ -6,6 +6,7 @@ import ( |
|
|
|
"time" |
|
|
|
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/glog" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/util" |
|
|
|
) |
|
|
|
|
|
|
|
func (v *Volume) garbageLevel() float64 { |
|
|
@ -20,7 +21,8 @@ func (v *Volume) Compact() error { |
|
|
|
//glog.V(3).Infof("Got Compaction lock...")
|
|
|
|
|
|
|
|
filePath := v.FileName() |
|
|
|
glog.V(3).Infof("creating copies for volume %d ...", v.Id) |
|
|
|
v.lastCompactingIndexOffset = v.nm.IndexFileSize() |
|
|
|
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactingIndexOffset) |
|
|
|
return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx") |
|
|
|
} |
|
|
|
|
|
|
@ -38,7 +40,19 @@ func (v *Volume) commitCompact() error { |
|
|
|
glog.V(3).Infof("Got Committing lock...") |
|
|
|
v.nm.Close() |
|
|
|
_ = v.dataFile.Close() |
|
|
|
makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx") |
|
|
|
|
|
|
|
var e error |
|
|
|
if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil { |
|
|
|
glog.V(0).Infof("makeupDiff in commitCompact failed %v", e) |
|
|
|
e = os.Remove(v.FileName() + ".cpd") |
|
|
|
if e != nil { |
|
|
|
return e |
|
|
|
} |
|
|
|
e = os.Remove(v.FileName() + ".cpx") |
|
|
|
if e != nil { |
|
|
|
return e |
|
|
|
} |
|
|
|
} else { |
|
|
|
var e error |
|
|
|
if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil { |
|
|
|
return e |
|
|
@ -46,6 +60,8 @@ func (v *Volume) commitCompact() error { |
|
|
|
if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil { |
|
|
|
return e |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
//glog.V(3).Infof("Pretending to be vacuuming...")
|
|
|
|
//time.Sleep(20 * time.Second)
|
|
|
|
glog.V(3).Infof("Loading Commit file...") |
|
|
@ -55,7 +71,85 @@ func (v *Volume) commitCompact() error { |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func makeupDiff(newDatFile, newIdxFile, oldDatFile, oldIdxFile string) (err error) { |
|
|
|
func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldIdxFileName string) (err error) { |
|
|
|
var indexSize int64 |
|
|
|
|
|
|
|
oldIdxFile, err := os.Open(oldIdxFileName) |
|
|
|
defer oldIdxFile.Close() |
|
|
|
|
|
|
|
oldDatFile, err := os.Open(oldDatFileName) |
|
|
|
defer oldDatFile.Close() |
|
|
|
|
|
|
|
if indexSize, err = verifyIndexFileIntegrity(oldIdxFile); err != nil { |
|
|
|
return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err) |
|
|
|
} |
|
|
|
if indexSize == 0 || uint64(indexSize) <= v.lastCompactingIndexOffset { |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
v.incrementedHasUpdatedIndexEntry = make(map[uint64]keyField) |
|
|
|
for idx_offset := indexSize; uint64(idx_offset) >= v.lastCompactingIndexOffset; idx_offset -= NeedleIndexSize { |
|
|
|
var IdxEntry []byte |
|
|
|
if IdxEntry, err = readIndexEntryAtOffset(oldIdxFile, idx_offset); err != nil { |
|
|
|
return fmt.Errorf("readIndexEntry %s at offset %d failed: %v", oldIdxFileName, idx_offset, err) |
|
|
|
} |
|
|
|
key, offset, size := idxFileEntry(IdxEntry) |
|
|
|
if _, found := v.incrementedHasUpdatedIndexEntry[key]; !found { |
|
|
|
v.incrementedHasUpdatedIndexEntry[key] = keyField{ |
|
|
|
offset: offset, |
|
|
|
size: size, |
|
|
|
} |
|
|
|
} else { |
|
|
|
continue |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if len(v.incrementedHasUpdatedIndexEntry) > 0 { |
|
|
|
var ( |
|
|
|
dst, idx *os.File |
|
|
|
) |
|
|
|
if dst, err = os.OpenFile(newDatFileName, os.O_WRONLY, 0644); err != nil { |
|
|
|
return |
|
|
|
} |
|
|
|
defer dst.Close() |
|
|
|
|
|
|
|
if idx, err = os.OpenFile(newIdxFileName, os.O_WRONLY, 0644); err != nil { |
|
|
|
return |
|
|
|
} |
|
|
|
defer idx.Close() |
|
|
|
|
|
|
|
idx_entry_bytes := make([]byte, 16) |
|
|
|
for key, incre_idx_entry := range v.incrementedHasUpdatedIndexEntry { |
|
|
|
util.Uint64toBytes(idx_entry_bytes[0:8], key) |
|
|
|
util.Uint32toBytes(idx_entry_bytes[8:12], incre_idx_entry.offset) |
|
|
|
util.Uint32toBytes(idx_entry_bytes[12:16], incre_idx_entry.size) |
|
|
|
|
|
|
|
if _, err := idx.Seek(0, 2); err != nil { |
|
|
|
return fmt.Errorf("cannot seek end of indexfile %s: %v", |
|
|
|
newIdxFileName, err) |
|
|
|
} |
|
|
|
_, err = idx.Write(idx_entry_bytes) |
|
|
|
|
|
|
|
//even the needle cache in memory is hit, the need_bytes is correct
|
|
|
|
needle_bytes, _, _ := ReadNeedleBlob(dst, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size) |
|
|
|
|
|
|
|
var offset int64 |
|
|
|
if offset, err = dst.Seek(0, 2); err != nil { |
|
|
|
glog.V(0).Infof("failed to seek the end of file: %v", err) |
|
|
|
return |
|
|
|
} |
|
|
|
//ensure file writing starting from aligned positions
|
|
|
|
if offset%NeedlePaddingSize != 0 { |
|
|
|
offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize) |
|
|
|
if offset, err = v.dataFile.Seek(offset, 0); err != nil { |
|
|
|
glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err) |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
dst.Write(needle_bytes) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|