diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go index 682c88f78..0e49da959 100644 --- a/weed/storage/needle_map.go +++ b/weed/storage/needle_map.go @@ -48,7 +48,6 @@ type TempNeedleMapper interface { NeedleMapper DoOffsetLoading(v *Volume, indexFile *os.File, startFrom uint64) error UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options) error - UpdateNeedleMapMetric(indexFile *os.File) error } func (nm *baseNeedleMapper) IndexFileSize() uint64 { diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go index e77651b78..30ed96c3b 100644 --- a/weed/storage/needle_map_leveldb.go +++ b/weed/storage/needle_map_leveldb.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/opt" @@ -179,6 +180,7 @@ func levelDbWrite(db *leveldb.DB, key NeedleId, offset Offset, size Size, update } return nil } + func levelDbDelete(db *leveldb.DB, key NeedleId) error { bytes := make([]byte, NeedleIdSize) NeedleIdToBytes(bytes, key) @@ -305,23 +307,45 @@ func (m *LevelDbNeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startF } err = idx.WalkIndexFile(indexFile, startFrom, func(key NeedleId, offset Offset, size Size) (e error) { - if !offset.IsZero() && size.IsValid() { + m.mapMetric.FileCounter++ + bytes := make([]byte, NeedleIdSize) + NeedleIdToBytes(bytes[0:NeedleIdSize], key) + // fresh loading + if startFrom == 0 { + m.mapMetric.FileByteCounter += uint64(size) + e = levelDbWrite(db, key, offset, size, false, 0) + return e + } + // increment loading + data, err := db.Get(bytes, nil) + if err != nil { + if !strings.Contains(strings.ToLower(err.Error()), "not found") { + // unexpected error + return err + } + // new needle, unlikely happen + m.mapMetric.FileByteCounter += uint64(size) e = levelDbWrite(db, key, offset, size, false, 0) } else { - e = levelDbDelete(db, key) + // needle is found + oldSize := BytesToSize(data[OffsetSize : OffsetSize+SizeSize]) + oldOffset := BytesToOffset(data[0:OffsetSize]) + if !offset.IsZero() && size.IsValid() { + // updated needle + m.mapMetric.FileByteCounter += uint64(size) + if !oldOffset.IsZero() && oldSize.IsValid() { + m.mapMetric.DeletionCounter++ + m.mapMetric.DeletionByteCounter += uint64(oldSize) + } + e = levelDbWrite(db, key, offset, size, false, 0) + } else { + // deleted needle + m.mapMetric.DeletionCounter++ + m.mapMetric.DeletionByteCounter += uint64(oldSize) + e = levelDbDelete(db, key) + } } return e }) - if err != nil { - return err - } - - if startFrom != 0 { - return needleMapMetricFromIndexFile(indexFile, &m.mapMetric) - } - return nil -} - -func (m *LevelDbNeedleMap) UpdateNeedleMapMetric(indexFile *os.File) error { - return needleMapMetricFromIndexFile(indexFile, &m.mapMetric) + return err } diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go index 93b1fa4f5..7721980ee 100644 --- a/weed/storage/needle_map_memory.go +++ b/weed/storage/needle_map_memory.go @@ -129,7 +129,3 @@ func (nm *NeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startFrom ui return e } - -func (m *NeedleMap) UpdateNeedleMapMetric(indexFile *os.File) error { - return nil -} diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 2862ca94d..38b5c0080 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -219,15 +219,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err) } if indexSize == 0 || uint64(indexSize) <= v.lastCompactIndexOffset { - if v.needleMapKind == NeedleMapInMemory { - return nil - } - newIdx, err := os.OpenFile(newIdxFileName, os.O_RDWR, 0644) - if err != nil { - return fmt.Errorf("open idx file %s failed: %v", newIdxFileName, err) - } - defer newIdx.Close() - return v.tmpNm.UpdateNeedleMapMetric(newIdx) + return nil } // fail if the old .dat file has changed to a new revision