diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go index ef3d4c151..682c88f78 100644 --- a/weed/storage/needle_map.go +++ b/weed/storage/needle_map.go @@ -8,6 +8,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/idx" "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" . "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/syndtr/goleveldb/leveldb/opt" ) type NeedleMapKind int @@ -43,6 +44,13 @@ type baseNeedleMapper struct { indexFileOffset int64 } +type TempNeedleMapper interface { + NeedleMapper + DoOffsetLoading(v *Volume, indexFile *os.File, startFrom uint64) error + UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options) error + UpdateNeedleMapMetric(indexFile *os.File) error +} + func (nm *baseNeedleMapper) IndexFileSize() uint64 { stat, err := nm.indexFile.Stat() if err == nil { diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go index 3eccfcfc7..b93b00ebc 100644 --- a/weed/storage/needle_map_leveldb.go +++ b/weed/storage/needle_map_leveldb.go @@ -19,7 +19,7 @@ import ( . "github.com/seaweedfs/seaweedfs/weed/storage/types" ) -//mark it every watermarkBatchSize operations +// mark it every watermarkBatchSize operations const watermarkBatchSize = 10000 var watermarkKey = []byte("idx_entry_watermark") @@ -165,7 +165,7 @@ func getWatermark(db *leveldb.DB) uint64 { } func setWatermark(db *leveldb.DB, watermark uint64) error { - glog.V(1).Infof("set watermark %d", watermark) + glog.V(3).Infof("set watermark %d", watermark) var wmBytes = make([]byte, 8) util.Uint64toBytes(wmBytes, watermark) if err := db.Put(watermarkKey, wmBytes, nil); err != nil { @@ -215,12 +215,14 @@ func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error { } func (m *LevelDbNeedleMap) Close() { - indexFileName := m.indexFile.Name() - if err := m.indexFile.Sync(); err != nil { - glog.Warningf("sync file %s failed: %v", indexFileName, err) - } - if err := m.indexFile.Close(); err != nil { - glog.Warningf("close index file %s failed: %v", indexFileName, err) + if m.indexFile != nil { + indexFileName := m.indexFile.Name() + if err := m.indexFile.Sync(); err != nil { + glog.Warningf("sync file %s failed: %v", indexFileName, err) + } + if err := m.indexFile.Close(); err != nil { + glog.Warningf("close index file %s failed: %v", indexFileName, err) + } } if m.db != nil { @@ -235,3 +237,99 @@ func (m *LevelDbNeedleMap) Destroy() error { os.Remove(m.indexFile.Name()) return os.RemoveAll(m.dbFileName) } + +func (m *LevelDbNeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options) error { + if v.nm != nil { + v.nm.Close() + v.nm = nil + } + defer func() { + if v.tmpNm != nil { + v.tmpNm.Close() + v.tmpNm = nil + } + }() + levelDbFile := v.FileName(".ldb") + m.indexFile = indexFile + err := os.RemoveAll(levelDbFile) + if err != nil { + return err + } + if err = os.Rename(v.FileName(".cpldb"), levelDbFile); err != nil { + return fmt.Errorf("rename %s: %v", levelDbFile, err) + } + + db, err := leveldb.OpenFile(levelDbFile, opts) + if err != nil { + if errors.IsCorrupted(err) { + db, err = leveldb.RecoverFile(levelDbFile, opts) + } + if err != nil { + return err + } + } + m.db = db + + stat, e := indexFile.Stat() + if e != nil { + glog.Fatalf("stat file %s: %v", indexFile.Name(), e) + return e + } + m.indexFileOffset = stat.Size() + m.recordCount = uint64(stat.Size() / types.NeedleMapEntrySize) + + //set watermark + watermark := (m.recordCount / watermarkBatchSize) * watermarkBatchSize + err = setWatermark(db, uint64(watermark)) + if err != nil { + glog.Fatalf("setting watermark failed %s: %v", indexFile.Name(), err) + return err + } + v.nm = m + v.tmpNm = nil + return e +} + +func (m *LevelDbNeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startFrom uint64) (err error) { + glog.V(0).Infof("loading idx to leveldb from offset %d for file: %s", startFrom, indexFile.Name()) + dbFileName := v.FileName(".cpldb") + db, dbErr := leveldb.OpenFile(dbFileName, nil) + defer func() { + if dbErr == nil { + db.Close() + } + if err != nil { + os.RemoveAll(dbFileName) + } + + }() + if dbErr != nil { + if errors.IsCorrupted(err) { + db, dbErr = leveldb.RecoverFile(dbFileName, nil) + } + if dbErr != nil { + return dbErr + } + } + + err = idx.WalkIndexFile(indexFile, startFrom, func(key NeedleId, offset Offset, size Size) (e error) { + if !offset.IsZero() && size.IsValid() { + e = levelDbWrite(db, key, offset, size, false, 0) + } else { + e = levelDbDelete(db, key) + } + return e + }) + if err != nil { + return err + } + + if startFrom != 0 { + return needleMapMetricFromIndexFile(indexFile, &m.mapMetric) + } + return nil +} + +func (m *LevelDbNeedleMap) UpdateNeedleMapMetric(indexFile *os.File) error { + return needleMapMetricFromIndexFile(indexFile, &m.mapMetric) +} diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go index 99a9f28e0..93b1fa4f5 100644 --- a/weed/storage/needle_map_memory.go +++ b/weed/storage/needle_map_memory.go @@ -7,6 +7,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/idx" "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" . "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/syndtr/goleveldb/leveldb/opt" ) type NeedleMap struct { @@ -69,6 +70,9 @@ func (nm *NeedleMap) Delete(key NeedleId, offset Offset) error { return nm.appendToIndexFile(key, offset, TombstoneFileSize) } func (nm *NeedleMap) Close() { + if nm.indexFile == nil { + return + } indexFileName := nm.indexFile.Name() if err := nm.indexFile.Sync(); err != nil { glog.Warningf("sync file %s failed, %v", indexFileName, err) @@ -79,3 +83,53 @@ func (nm *NeedleMap) Destroy() error { nm.Close() return os.Remove(nm.indexFile.Name()) } + +func (nm *NeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options) error { + if v.nm != nil { + v.nm.Close() + v.nm = nil + } + defer func() { + if v.tmpNm != nil { + v.tmpNm.Close() + v.tmpNm = nil + } + }() + nm.indexFile = indexFile + stat, err := indexFile.Stat() + if err != nil { + glog.Fatalf("stat file %s: %v", indexFile.Name(), err) + return err + } + nm.indexFileOffset = stat.Size() + v.nm = nm + v.tmpNm = nil + return nil +} + +func (nm *NeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startFrom uint64) error { + glog.V(0).Infof("loading idx from offset %d for file: %s", startFrom, indexFile.Name()) + e := idx.WalkIndexFile(indexFile, startFrom, func(key NeedleId, offset Offset, size Size) error { + nm.MaybeSetMaxFileKey(key) + nm.FileCounter++ + if !offset.IsZero() && size.IsValid() { + nm.FileByteCounter = nm.FileByteCounter + uint64(size) + oldOffset, oldSize := nm.m.Set(NeedleId(key), offset, size) + if !oldOffset.IsZero() && oldSize.IsValid() { + nm.DeletionCounter++ + nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) + } + } else { + oldSize := nm.m.Delete(NeedleId(key)) + nm.DeletionCounter++ + nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) + } + return nil + }) + + return e +} + +func (m *NeedleMap) UpdateNeedleMapMetric(indexFile *os.File) error { + return nil +} diff --git a/weed/storage/needle_map_metric.go b/weed/storage/needle_map_metric.go index 3cda63dc5..7669180ba 100644 --- a/weed/storage/needle_map_metric.go +++ b/weed/storage/needle_map_metric.go @@ -91,11 +91,10 @@ func (mm *mapMetric) MaybeSetMaxFileKey(key NeedleId) { } } -func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) { - mm = &mapMetric{} +func needleMapMetricFromIndexFile(r *os.File, mm *mapMetric) error { var bf *boom.BloomFilter buf := make([]byte, NeedleIdSize) - err = reverseWalkIndexFile(r, func(entryCount int64) { + err := reverseWalkIndexFile(r, func(entryCount int64) { bf = boom.NewBloomFilter(uint(entryCount), 0.001) }, func(key NeedleId, offset Offset, size Size) error { @@ -121,6 +120,12 @@ func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) { } return nil }) + return err +} + +func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) { + mm = &mapMetric{} + err = needleMapMetricFromIndexFile(r, mm) return } diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 4a2ed706d..91d2d0d1f 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -25,6 +25,7 @@ type Volume struct { Collection string DataBackend backend.BackendStorageFile nm NeedleMapper + tmpNm TempNeedleMapper needleMapKind NeedleMapKind noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index aa7cf1cfa..97b69ad71 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -133,39 +133,59 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind } else { switch needleMapKind { case NeedleMapInMemory: - glog.V(0).Infoln("loading index", v.FileName(".idx"), "to memory") - if v.nm, err = LoadCompactNeedleMap(indexFile); err != nil { - glog.V(0).Infof("loading index %s to memory error: %v", v.FileName(".idx"), err) + if v.tmpNm != nil { + glog.V(0).Infof("updating memory compact index %s ", v.FileName(".idx")) + err = v.tmpNm.UpdateNeedleMap(v, indexFile, nil) + } else { + glog.V(0).Infoln("loading memory index", v.FileName(".idx"), "to memory") + if v.nm, err = LoadCompactNeedleMap(indexFile); err != nil { + glog.V(0).Infof("loading index %s to memory error: %v", v.FileName(".idx"), err) + } } case NeedleMapLevelDb: - glog.V(0).Infoln("loading leveldb", v.FileName(".ldb")) opts := &opt.Options{ BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB CompactionTableSizeMultiplier: 10, // default value is 1 } - if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil { - glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err) + if v.tmpNm != nil { + glog.V(0).Infoln("updating leveldb index", v.FileName(".ldb")) + err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts) + } else { + glog.V(0).Infoln("loading leveldb index", v.FileName(".ldb")) + if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil { + glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err) + } } case NeedleMapLevelDbMedium: - glog.V(0).Infoln("loading leveldb medium", v.FileName(".ldb")) opts := &opt.Options{ BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB CompactionTableSizeMultiplier: 10, // default value is 1 } - if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil { - glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err) + if v.tmpNm != nil { + glog.V(0).Infoln("updating leveldb medium index", v.FileName(".ldb")) + err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts) + } else { + glog.V(0).Infoln("loading leveldb medium index", v.FileName(".ldb")) + if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil { + glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err) + } } case NeedleMapLevelDbLarge: - glog.V(0).Infoln("loading leveldb large", v.FileName(".ldb")) opts := &opt.Options{ BlockCacheCapacity: 8 * 1024 * 1024, // default value is 8MiB WriteBuffer: 4 * 1024 * 1024, // default value is 4MiB CompactionTableSizeMultiplier: 10, // default value is 1 } - if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil { - glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err) + if v.tmpNm != nil { + glog.V(0).Infoln("updating leveldb large index", v.FileName(".ldb")) + err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts) + } else { + glog.V(0).Infoln("loading leveldb large index", v.FileName(".ldb")) + if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil { + glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err) + } } } } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index ba00666bc..642b3eab5 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -13,6 +13,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/storage/types" . "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" ) @@ -88,7 +89,7 @@ func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64, prog if err := v.nm.Sync(); err != nil { glog.V(0).Infof("compact2 fail to sync volume idx %d: %v", v.Id, err) } - return copyDataBasedOnIndexFile( + return v.copyDataBasedOnIndexFile( v.FileName(".dat"), v.FileName(".idx"), v.FileName(".cpd"), v.FileName(".cpx"), v.SuperBlock, @@ -114,7 +115,10 @@ func (v *Volume) CommitCompact() error { defer v.dataFileAccessLock.Unlock() glog.V(3).Infof("Got volume %d committing lock...", v.Id) - v.nm.Close() + if v.nm != nil { + v.nm.Close() + v.nm = nil + } if v.DataBackend != nil { if err := v.DataBackend.Close(); err != nil { glog.V(0).Infof("fail to close volume %d", v.Id) @@ -163,6 +167,7 @@ func (v *Volume) CommitCompact() error { if e = v.load(true, false, v.needleMapKind, 0); e != nil { return e } + glog.V(3).Infof("Finish commiting volume %d", v.Id) return nil } @@ -171,12 +176,16 @@ func (v *Volume) cleanupCompact() error { e1 := os.Remove(v.FileName(".cpd")) e2 := os.Remove(v.FileName(".cpx")) + e3 := os.RemoveAll(v.FileName(".cpldb")) if e1 != nil && !os.IsNotExist(e1) { return e1 } if e2 != nil && !os.IsNotExist(e2) { return e2 } + if e3 != nil && !os.IsNotExist(e3) { + return e3 + } return nil } @@ -210,7 +219,15 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err) } if indexSize == 0 || uint64(indexSize) <= v.lastCompactIndexOffset { - return nil + if v.needleMapKind == NeedleMapInMemory { + return nil + } + newIdx, err := os.OpenFile(newIdxFileName, os.O_RDWR, 0644) + if err != nil { + return fmt.Errorf("open idx file %s failed: %v", newIdxFileName, err) + } + defer newIdx.Close() + return v.tmpNm.UpdateNeedleMapMetric(newIdx) } // fail if the old .dat file has changed to a new revision @@ -261,7 +278,13 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI if idx, err = os.OpenFile(newIdxFileName, os.O_RDWR, 0644); err != nil { return fmt.Errorf("open idx file %s failed: %v", newIdxFileName, err) } + defer idx.Close() + stat, err := idx.Stat() + if err != nil { + return fmt.Errorf("stat file %s: %v", idx.Name(), err) + } + idxSize := stat.Size() var newDatCompactRevision uint16 newDatCompactRevision, err = fetchCompactRevisionFromDatFile(dstDatBackend) @@ -289,7 +312,6 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI return } } - //updated needle if !increIdxEntry.offset.IsZero() && increIdxEntry.size != 0 && increIdxEntry.size.IsValid() { //even the needle cache in memory is hit, the need_bytes is correct @@ -327,7 +349,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI } } - return nil + return v.tmpNm.DoOffsetLoading(v, idx, uint64(idxSize)/types.NeedleMapEntrySize) } type VolumeFileScanner4Vacuum struct { @@ -400,7 +422,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca return nm.SaveToIdx(idxName) } -func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate, compactionBytePerSecond int64, progressFn ProgressFunc) (err error) { +func (v *Volume) copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate, compactionBytePerSecond int64, progressFn ProgressFunc) (err error) { var ( srcDatBackend, dstDatBackend backend.BackendStorageFile dataFile *os.File @@ -430,7 +452,6 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str newOffset := int64(sb.BlockSize()) writeThrottler := util.NewWriteThrottler(compactionBytePerSecond) - err = oldNm.AscendingVisit(func(value needle_map.NeedleValue) error { offset, size := value.Offset, value.Size @@ -471,6 +492,41 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str return err } - return newNm.SaveToIdx(datIdxName) + err = newNm.SaveToIdx(datIdxName) + if err != nil { + return err + } + + indexFile, err := os.OpenFile(datIdxName, os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + glog.Errorf("cannot open Volume Index %s: %v", datIdxName, err) + return err + } + defer indexFile.Close() + if v.tmpNm != nil { + v.tmpNm.Close() + v.tmpNm = nil + } + if v.needleMapKind == NeedleMapInMemory { + nm := &NeedleMap{ + m: needle_map.NewCompactMap(), + } + v.tmpNm = nm + //can be optimized, filling nm in oldNm.AscendingVisit + err = v.tmpNm.DoOffsetLoading(nil, indexFile, 0) + return err + } else { + dbFileName := v.FileName(".ldb") + m := &LevelDbNeedleMap{dbFileName: dbFileName} + m.dbFileName = dbFileName + mm := &mapMetric{} + m.mapMetric = *mm + v.tmpNm = m + err = v.tmpNm.DoOffsetLoading(v, indexFile, 0) + if err != nil { + return err + } + } + return } diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index f4f38c097..d26fc7ab7 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -2,6 +2,7 @@ package storage import ( "math/rand" + "reflect" "testing" "time" @@ -60,10 +61,18 @@ func TestMakeDiff(t *testing.T) { */ } -func TestCompaction(t *testing.T) { +func TestMemIndexCompaction(t *testing.T) { + testCompaction(t, NeedleMapInMemory) +} + +func TestLDBIndexCompaction(t *testing.T) { + testCompaction(t, NeedleMapLevelDb) +} + +func testCompaction(t *testing.T, needleMapKind NeedleMapKind) { dir := t.TempDir() - v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0) + v, err := NewVolume(dir, dir, "", 1, needleMapKind, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0) if err != nil { t.Fatalf("volume creation: %v", err) } @@ -82,15 +91,31 @@ func TestCompaction(t *testing.T) { speed := float64(v.ContentSize()) / time.Now().Sub(startTime).Seconds() t.Logf("compaction speed: %.2f bytes/s", speed) - for i := 1; i <= afterCommitFileCount; i++ { - doSomeWritesDeletes(i+beforeCommitFileCount, v, t, infos) + // update & delete original objects, upload & delete new objects + for i := 1; i <= afterCommitFileCount+beforeCommitFileCount; i++ { + doSomeWritesDeletes(i, v, t, infos) } - v.CommitCompact() + realRecordCount := v.nm.IndexFileSize() / types.NeedleMapEntrySize + if needleMapKind == NeedleMapLevelDb { + nm := reflect.ValueOf(v.nm).Interface().(*LevelDbNeedleMap) + mm := nm.mapMetric + watermark := getWatermark(nm.db) + realWatermark := (nm.recordCount / watermarkBatchSize) * watermarkBatchSize + t.Logf("watermark from levelDB: %d, realWatermark: %d, nm.recordCount: %d, realRecordCount:%d, fileCount=%d, deletedcount:%d", watermark, realWatermark, nm.recordCount, realRecordCount, mm.FileCount(), v.DeletedCount()) + if realWatermark != watermark { + t.Fatalf("testing watermark failed") + } + } else { + t.Logf("realRecordCount:%d, v.FileCount():%d mm.DeletedCount():%d", realRecordCount, v.FileCount(), v.DeletedCount()) + } + if realRecordCount != v.FileCount() { + t.Fatalf("testing file count failed") + } v.Close() - v, err = NewVolume(dir, dir, "", 1, NeedleMapInMemory, nil, nil, 0, 0) + v, err = NewVolume(dir, dir, "", 1, needleMapKind, nil, nil, 0, 0) if err != nil { t.Fatalf("volume reloading: %v", err) }