From 13e7069eb9cd72f94e72acb8fbbc9dd0307da703 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 6 Jan 2017 10:22:20 -0800 Subject: [PATCH] keep track of total data file size --- unmaintained/fix_dat/fix_dat.go | 2 +- weed/command/fix.go | 2 +- weed/storage/disk_location.go | 7 ++++++- weed/storage/needle.go | 2 ++ weed/storage/needle_map.go | 2 +- weed/storage/needle_map_boltdb.go | 6 +++--- weed/storage/needle_map_leveldb.go | 6 +++--- weed/storage/needle_map_memory.go | 6 +++--- weed/storage/needle_read_write.go | 14 +++++++++----- weed/storage/volume.go | 1 + weed/storage/volume_checking.go | 22 ++++++++++++++-------- weed/storage/volume_loading.go | 3 ++- weed/storage/volume_read_write.go | 30 +++++++++++++++++++++++------- weed/storage/volume_super_block.go | 1 + weed/storage/volume_sync.go | 2 +- weed/storage/volume_vacuum.go | 10 +++++----- 16 files changed, 76 insertions(+), 40 deletions(-) diff --git a/unmaintained/fix_dat/fix_dat.go b/unmaintained/fix_dat/fix_dat.go index bcb985fe9..1f95e6cd6 100644 --- a/unmaintained/fix_dat/fix_dat.go +++ b/unmaintained/fix_dat/fix_dat.go @@ -60,7 +60,7 @@ func main() { iterateEntries(datFile, indexFile, func(n *storage.Needle, offset int64) { fmt.Printf("file id=%d name=%s size=%d dataSize=%d\n", n.Id, string(n.Name), n.Size, n.DataSize) - s, e := n.Append(newDatFile, storage.Version2) + s, _, e := n.Append(newDatFile, storage.Version2) fmt.Printf("size %d error %v\n", s, e) }) diff --git a/weed/command/fix.go b/weed/command/fix.go index 2ec74d026..22480dcd0 100644 --- a/weed/command/fix.go +++ b/weed/command/fix.go @@ -58,7 +58,7 @@ func runFix(cmd *Command, args []string) bool { glog.V(2).Infof("saved %d with error %v", n.Size, pe) } else { glog.V(2).Infof("skipping deleted file ...") - return nm.Delete(n.Id) + return nm.Delete(n.Id, uint32(offset/storage.NeedlePaddingSize)) } return nil }) diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index e7604a734..039b4f3b9 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -40,7 +40,12 @@ func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleM mutex.Lock() l.volumes[vid] = v mutex.Unlock() - glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String()) + glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", + l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String()) + if v.Size() != v.dataFileSize { + glog.V(0).Infof("data file %s, size=%d expected=%d", + l.Directory+"/"+name, v.Size(), v.dataFileSize) + } } else { glog.V(0).Infof("new volume %s error %s", name, e) } diff --git a/weed/storage/needle.go b/weed/storage/needle.go index daa050be8..1d306395e 100644 --- a/weed/storage/needle.go +++ b/weed/storage/needle.go @@ -3,6 +3,7 @@ package storage import ( "fmt" "io/ioutil" + "math" "mime" "net/http" "path" @@ -20,6 +21,7 @@ const ( NeedlePaddingSize = 8 NeedleChecksumSize = 4 MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8 + TombstoneFileSize = math.MaxUint32 ) /* diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go index 142018946..15a0387c5 100644 --- a/weed/storage/needle_map.go +++ b/weed/storage/needle_map.go @@ -24,7 +24,7 @@ const ( type NeedleMapper interface { Put(key uint64, offset uint32, size uint32) error Get(key uint64) (element *NeedleValue, ok bool) - Delete(key uint64) error + Delete(key uint64, offset uint32) error Close() Destroy() error ContentSize() uint64 diff --git a/weed/storage/needle_map_boltdb.go b/weed/storage/needle_map_boltdb.go index bd3edf28d..e131ea822 100644 --- a/weed/storage/needle_map_boltdb.go +++ b/weed/storage/needle_map_boltdb.go @@ -63,7 +63,7 @@ func generateBoltDbFile(dbFileName string, indexFile *os.File) error { } defer db.Close() return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error { - if offset > 0 { + if offset > 0 && size != TombstoneFileSize { boltDbWrite(db, key, offset, size) } else { boltDbDelete(db, key) @@ -143,12 +143,12 @@ func boltDbDelete(db *bolt.DB, key uint64) error { }) } -func (m *BoltDbNeedleMap) Delete(key uint64) error { +func (m *BoltDbNeedleMap) Delete(key uint64, offset uint32) error { if oldNeedle, ok := m.Get(key); ok { m.logDelete(oldNeedle.Size) } // write to index file first - if err := m.appendToIndexFile(key, 0, 0); err != nil { + if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil { return err } return boltDbDelete(m.db, key) diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go index 1789dbb12..f025ea360 100644 --- a/weed/storage/needle_map_leveldb.go +++ b/weed/storage/needle_map_leveldb.go @@ -61,7 +61,7 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error { } defer db.Close() return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error { - if offset > 0 { + if offset > 0 && size != TombstoneFileSize { levelDbWrite(db, key, offset, size) } else { levelDbDelete(db, key) @@ -112,12 +112,12 @@ func levelDbDelete(db *leveldb.DB, key uint64) error { return db.Delete(bytes, nil) } -func (m *LevelDbNeedleMap) Delete(key uint64) error { +func (m *LevelDbNeedleMap) Delete(key uint64, offset uint32) error { if oldNeedle, ok := m.Get(key); ok { m.logDelete(oldNeedle.Size) } // write to index file first - if err := m.appendToIndexFile(key, 0, 0); err != nil { + if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil { return err } return levelDbDelete(m.db, key) diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go index 195d8bdbc..6fa929d90 100644 --- a/weed/storage/needle_map_memory.go +++ b/weed/storage/needle_map_memory.go @@ -33,7 +33,7 @@ func LoadNeedleMap(file *os.File) (*NeedleMap, error) { } nm.FileCounter++ nm.FileByteCounter = nm.FileByteCounter + uint64(size) - if offset > 0 { + if offset > 0 && size != TombstoneFileSize { oldSize := nm.m.Set(Key(key), offset, size) glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) if oldSize > 0 { @@ -92,10 +92,10 @@ func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { element, ok = nm.m.Get(Key(key)) return } -func (nm *NeedleMap) Delete(key uint64) error { +func (nm *NeedleMap) Delete(key uint64, offset uint32) error { deletedBytes := nm.m.Delete(Key(key)) nm.logDelete(deletedBytes) - return nm.appendToIndexFile(key, 0, 0) + return nm.appendToIndexFile(key, offset, TombstoneFileSize) } func (nm *NeedleMap) Close() { _ = nm.indexFile.Close() diff --git a/weed/storage/needle_read_write.go b/weed/storage/needle_read_write.go index 3ac236951..8baa325df 100644 --- a/weed/storage/needle_read_write.go +++ b/weed/storage/needle_read_write.go @@ -22,10 +22,10 @@ const ( ) func (n *Needle) DiskSize() int64 { - padding := NeedlePaddingSize - ((NeedleHeaderSize + int64(n.Size) + NeedleChecksumSize) % NeedlePaddingSize) - return NeedleHeaderSize + int64(n.Size) + padding + NeedleChecksumSize + return getActualSize(n.Size) } -func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { + +func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize int64, err error) { if s, ok := w.(io.Seeker); ok { if end, e := s.Seek(0, 1); e == nil { defer func(s io.Seeker, off int64) { @@ -54,6 +54,7 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { if _, err = w.Write(n.Data); err != nil { return } + actualSize = NeedleHeaderSize + int64(n.Size) padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) _, err = w.Write(header[0 : NeedleChecksumSize+padding]) @@ -131,9 +132,12 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) _, err = w.Write(header[0 : NeedleChecksumSize+padding]) - return n.DataSize, err + + actualSize = NeedleHeaderSize + int64(n.Size) + NeedleChecksumSize + int64(padding) + + return n.DataSize, actualSize, err } - return 0, fmt.Errorf("Unsupported Version! (%d)", version) + return 0, 0, fmt.Errorf("Unsupported Version! (%d)", version) } func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, block *Block, err error) { diff --git a/weed/storage/volume.go b/weed/storage/volume.go index dfd623eaa..11ee600df 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -15,6 +15,7 @@ type Volume struct { dir string Collection string dataFile *os.File + dataFileSize int64 nm NeedleMapper needleMapKind NeedleMapType readOnly bool diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index 48f707594..6d4011f27 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -7,27 +7,33 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error { +func getActualSize(size uint32) int64 { + padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize) + return NeedleHeaderSize + int64(size) + NeedleChecksumSize + int64(padding) +} + +func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (int64, error) { var indexSize int64 var e error if indexSize, e = verifyIndexFileIntegrity(indexFile); e != nil { - return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e) + return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e) } if indexSize == 0 { - return nil + return int64(SuperBlockSize), nil } var lastIdxEntry []byte if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleIndexSize); e != nil { - return fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e) + return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e) } key, offset, size := idxFileEntry(lastIdxEntry) - if offset == 0 { - return nil + if offset == 0 || size == TombstoneFileSize { + return 0, nil } if e = verifyNeedleIntegrity(v.dataFile, v.Version(), int64(offset)*NeedlePaddingSize, key, size); e != nil { - return fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e) + return 0, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e) } - return nil + + return int64(offset)*int64(NeedlePaddingSize) + getActualSize(size), nil } func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) { diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index f2099de83..7bc65a4a3 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -64,7 +64,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e) } } - if e = CheckVolumeDataIntegrity(v, indexFile); e != nil { + if v.dataFileSize, e = CheckVolumeDataIntegrity(v, indexFile); e != nil { v.readOnly = true glog.V(0).Infof("volumeDataIntegrityChecking failed %v", e) } @@ -86,6 +86,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind } } } + return e } diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 7458b4879..66f18557f 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -60,6 +60,8 @@ func (v *Volume) AppendBlob(b []byte) (offset int64, err error) { if offset, err = v.dataFile.Seek(0, 2); err != nil { glog.V(0).Infof("failed to seek the end of file: %v", err) return + } else if offset != int64(v.dataFileSize) { + glog.V(0).Infof("dataFileSize %d != actual data file size: %d", v.dataFileSize, offset) } //ensure file writing starting from aligned positions if offset%NeedlePaddingSize != 0 { @@ -67,9 +69,12 @@ func (v *Volume) AppendBlob(b []byte) (offset int64, err error) { if offset, err = v.dataFile.Seek(offset, 0); err != nil { glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err) return + } else if offset != int64(v.dataFileSize) { + glog.V(0).Infof("dataFileSize %d != actual data file size: %d", v.dataFileSize, offset) } } - v.dataFile.Write(b) + _, err = v.dataFile.Write(b) + v.dataFileSize += int64(len(b)) return } @@ -86,10 +91,12 @@ func (v *Volume) writeNeedle(n *Needle) (size uint32, err error) { glog.V(4).Infof("needle is unchanged!") return } - var offset int64 + var offset, actualSize int64 if offset, err = v.dataFile.Seek(0, 2); err != nil { glog.V(0).Infof("failed to seek the end of file: %v", err) return + } else if offset != int64(v.dataFileSize) { + glog.V(0).Infof("dataFileSize %d != actual data file size: %d", v.dataFileSize, offset) } //ensure file writing starting from aligned positions @@ -101,12 +108,14 @@ func (v *Volume) writeNeedle(n *Needle) (size uint32, err error) { } } - if size, err = n.Append(v.dataFile, v.Version()); err != nil { + if size, actualSize, err = n.Append(v.dataFile, v.Version()); err != nil { if e := v.dataFile.Truncate(offset); e != nil { err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e) } return } + v.dataFileSize += actualSize + nv, ok := v.nm.Get(n.Id) if !ok || int64(nv.Offset)*NeedlePaddingSize < offset { if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil { @@ -128,16 +137,20 @@ func (v *Volume) deleteNeedle(n *Needle) (uint32, error) { defer v.dataFileAccessLock.Unlock() nv, ok := v.nm.Get(n.Id) //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size) - if ok { + if ok && nv.Size != TombstoneFileSize { size := nv.Size - if err := v.nm.Delete(n.Id); err != nil { + // println("adding tombstone", n.Id, "at offset", v.dataFileSize) + if err := v.nm.Delete(n.Id, uint32(v.dataFileSize/NeedlePaddingSize)); err != nil { return size, err } - if _, err := v.dataFile.Seek(0, 2); err != nil { + if offset, err := v.dataFile.Seek(0, 2); err != nil { return size, err + } else if offset != int64(v.dataFileSize) { + glog.V(0).Infof("dataFileSize %d != actual data file size: %d, deleteMarker: %d", v.dataFileSize, offset, getActualSize(0)) } n.Data = nil - _, err := n.Append(v.dataFile, v.Version()) + _, actualSize, err := n.Append(v.dataFile, v.Version()) + v.dataFileSize += actualSize return size, err } return 0, nil @@ -149,6 +162,9 @@ func (v *Volume) readNeedle(n *Needle) (int, error) { if !ok || nv.Offset == 0 { return -1, errors.New("Not Found") } + if nv.Size == TombstoneFileSize { + return -1, errors.New("Already Deleted") + } err := n.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) if err != nil { return 0, err diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go index fc773273d..ae6ee7c25 100644 --- a/weed/storage/volume_super_block.go +++ b/weed/storage/volume_super_block.go @@ -56,6 +56,7 @@ func (v *Volume) maybeWriteSuperBlock() error { } } } + v.dataFileSize = SuperBlockSize } return e } diff --git a/weed/storage/volume_sync.go b/weed/storage/volume_sync.go index 7448b856f..23d8db510 100644 --- a/weed/storage/volume_sync.go +++ b/weed/storage/volume_sync.go @@ -148,7 +148,7 @@ func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m CompactMap, la total := 0 err = operation.GetVolumeIdxEntries(volumeServer, vid.String(), func(key uint64, offset, size uint32) { // println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size) - if offset != 0 && size != 0 { + if offset > 0 && size != TombstoneFileSize { m.Set(Key(key), offset, size) } else { m.Delete(Key(key)) diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 723300557..f3ded5ff2 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -35,7 +35,7 @@ func (v *Volume) Compact2() error { } func (v *Volume) commitCompact() error { - glog.V(3).Infof("Committing vacuuming...") + glog.V(0).Infof("Committing vacuuming...") v.dataFileAccessLock.Lock() defer v.dataFileAccessLock.Unlock() glog.V(3).Infof("Got Committing lock...") @@ -189,7 +189,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI fakeDelNeedle := new(Needle) fakeDelNeedle.Id = key fakeDelNeedle.Cookie = 0x12345678 - _, err = fakeDelNeedle.Append(dst, v.Version()) + _, _, err = fakeDelNeedle.Append(dst, v.Version()) if err != nil { return } @@ -241,7 +241,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil { return fmt.Errorf("cannot put needle: %s", err) } - if _, err = n.Append(dst, v.Version()); err != nil { + if _, _, err := n.Append(dst, v.Version()); err != nil { return fmt.Errorf("cannot append needle: %s", err) } new_offset += n.DiskSize() @@ -280,7 +280,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { new_offset := int64(SuperBlockSize) WalkIndexFile(oldIndexFile, func(key uint64, offset, size uint32) error { - if size <= 0 { + if offset == 0 || size == TombstoneFileSize { return nil } @@ -302,7 +302,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil { return fmt.Errorf("cannot put needle: %s", err) } - if _, err = n.Append(dst, v.Version()); err != nil { + if _, _, err = n.Append(dst, v.Version()); err != nil { return fmt.Errorf("cannot append needle: %s", err) } new_offset += n.DiskSize()