diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 801dfe267..258787701 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -10,6 +10,11 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" ) +type keyField struct { + offset uint32 + size uint32 +} + type Volume struct { Id VolumeId dir string @@ -23,6 +28,9 @@ type Volume struct { dataFileAccessLock sync.Mutex lastModifiedTime uint64 //unix time in seconds + + lastCompactingIndexOffset uint64 + incrementedHasUpdatedIndexEntry map[uint64]keyField } func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 51d74e311..55c248894 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -6,6 +6,7 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" ) func (v *Volume) garbageLevel() float64 { @@ -20,7 +21,8 @@ func (v *Volume) Compact() error { //glog.V(3).Infof("Got Compaction lock...") filePath := v.FileName() - glog.V(3).Infof("creating copies for volume %d ...", v.Id) + v.lastCompactingIndexOffset = v.nm.IndexFileSize() + glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactingIndexOffset) return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx") } @@ -38,14 +40,28 @@ func (v *Volume) commitCompact() error { glog.V(3).Infof("Got Committing lock...") v.nm.Close() _ = v.dataFile.Close() - makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx") + var e error - if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil { - return e - } - if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil { - return e + if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil { + glog.V(0).Infof("makeupDiff in commitCompact failed %v", e) + e = os.Remove(v.FileName() + ".cpd") + if e != nil { + return e + } + e = os.Remove(v.FileName() + ".cpx") + if e != nil { + return e + } + } else { + var e error + if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil { + return e + } + if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil { + return e + } } + //glog.V(3).Infof("Pretending to be vacuuming...") //time.Sleep(20 * time.Second) glog.V(3).Infof("Loading Commit file...") @@ -55,7 +71,85 @@ func (v *Volume) commitCompact() error { return nil } -func makeupDiff(newDatFile, newIdxFile, oldDatFile, oldIdxFile string) (err error) { +func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldIdxFileName string) (err error) { + var indexSize int64 + + oldIdxFile, err := os.Open(oldIdxFileName) + defer oldIdxFile.Close() + + oldDatFile, err := os.Open(oldDatFileName) + defer oldDatFile.Close() + + if indexSize, err = verifyIndexFileIntegrity(oldIdxFile); err != nil { + return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err) + } + if indexSize == 0 || uint64(indexSize) <= v.lastCompactingIndexOffset { + return nil + } + + v.incrementedHasUpdatedIndexEntry = make(map[uint64]keyField) + for idx_offset := indexSize; uint64(idx_offset) >= v.lastCompactingIndexOffset; idx_offset -= NeedleIndexSize { + var IdxEntry []byte + if IdxEntry, err = readIndexEntryAtOffset(oldIdxFile, idx_offset); err != nil { + return fmt.Errorf("readIndexEntry %s at offset %d failed: %v", oldIdxFileName, idx_offset, err) + } + key, offset, size := idxFileEntry(IdxEntry) + if _, found := v.incrementedHasUpdatedIndexEntry[key]; !found { + v.incrementedHasUpdatedIndexEntry[key] = keyField{ + offset: offset, + size: size, + } + } else { + continue + } + } + + if len(v.incrementedHasUpdatedIndexEntry) > 0 { + var ( + dst, idx *os.File + ) + if dst, err = os.OpenFile(newDatFileName, os.O_WRONLY, 0644); err != nil { + return + } + defer dst.Close() + + if idx, err = os.OpenFile(newIdxFileName, os.O_WRONLY, 0644); err != nil { + return + } + defer idx.Close() + + idx_entry_bytes := make([]byte, 16) + for key, incre_idx_entry := range v.incrementedHasUpdatedIndexEntry { + util.Uint64toBytes(idx_entry_bytes[0:8], key) + util.Uint32toBytes(idx_entry_bytes[8:12], incre_idx_entry.offset) + util.Uint32toBytes(idx_entry_bytes[12:16], incre_idx_entry.size) + + if _, err := idx.Seek(0, 2); err != nil { + return fmt.Errorf("cannot seek end of indexfile %s: %v", + newIdxFileName, err) + } + _, err = idx.Write(idx_entry_bytes) + + //even the needle cache in memory is hit, the need_bytes is correct + needle_bytes, _, _ := ReadNeedleBlob(dst, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size) + + var offset int64 + if offset, err = dst.Seek(0, 2); err != nil { + glog.V(0).Infof("failed to seek the end of file: %v", err) + return + } + //ensure file writing starting from aligned positions + if offset%NeedlePaddingSize != 0 { + offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize) + if offset, err = v.dataFile.Seek(offset, 0); err != nil { + glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err) + return + } + } + dst.Write(needle_bytes) + } + } + return nil }