From ed848425c77ccca0a9d2f30c7f631bc50f28cd32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9C=8D=E6=99=93=E6=A0=8B?= Date: Thu, 29 Sep 2016 13:57:23 +0800 Subject: [PATCH 1/4] supplemental data between compacting and commit compacting --- weed/storage/volume.go | 8 +++ weed/storage/volume_vacuum.go | 110 +++++++++++++++++++++++++++++++--- 2 files changed, 110 insertions(+), 8 deletions(-) diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 801dfe267..258787701 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -10,6 +10,11 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" ) +type keyField struct { + offset uint32 + size uint32 +} + type Volume struct { Id VolumeId dir string @@ -23,6 +28,9 @@ type Volume struct { dataFileAccessLock sync.Mutex lastModifiedTime uint64 //unix time in seconds + + lastCompactingIndexOffset uint64 + incrementedHasUpdatedIndexEntry map[uint64]keyField } func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 51d74e311..55c248894 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -6,6 +6,7 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" ) func (v *Volume) garbageLevel() float64 { @@ -20,7 +21,8 @@ func (v *Volume) Compact() error { //glog.V(3).Infof("Got Compaction lock...") filePath := v.FileName() - glog.V(3).Infof("creating copies for volume %d ...", v.Id) + v.lastCompactingIndexOffset = v.nm.IndexFileSize() + glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactingIndexOffset) return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx") } @@ -38,14 +40,28 @@ func (v *Volume) commitCompact() error { glog.V(3).Infof("Got Committing lock...") v.nm.Close() _ = v.dataFile.Close() - makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx") + var e error - if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil { - return e - } - if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil { - return e + if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil { + glog.V(0).Infof("makeupDiff in commitCompact failed %v", e) + e = os.Remove(v.FileName() + ".cpd") + if e != nil { + return e + } + e = os.Remove(v.FileName() + ".cpx") + if e != nil { + return e + } + } else { + var e error + if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil { + return e + } + if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil { + return e + } } + //glog.V(3).Infof("Pretending to be vacuuming...") //time.Sleep(20 * time.Second) glog.V(3).Infof("Loading Commit file...") @@ -55,7 +71,85 @@ func (v *Volume) commitCompact() error { return nil } -func makeupDiff(newDatFile, newIdxFile, oldDatFile, oldIdxFile string) (err error) { +func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldIdxFileName string) (err error) { + var indexSize int64 + + oldIdxFile, err := os.Open(oldIdxFileName) + defer oldIdxFile.Close() + + oldDatFile, err := os.Open(oldDatFileName) + defer oldDatFile.Close() + + if indexSize, err = verifyIndexFileIntegrity(oldIdxFile); err != nil { + return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err) + } + if indexSize == 0 || uint64(indexSize) <= v.lastCompactingIndexOffset { + return nil + } + + v.incrementedHasUpdatedIndexEntry = make(map[uint64]keyField) + for idx_offset := indexSize; uint64(idx_offset) >= v.lastCompactingIndexOffset; idx_offset -= NeedleIndexSize { + var IdxEntry []byte + if IdxEntry, err = readIndexEntryAtOffset(oldIdxFile, idx_offset); err != nil { + return fmt.Errorf("readIndexEntry %s at offset %d failed: %v", oldIdxFileName, idx_offset, err) + } + key, offset, size := idxFileEntry(IdxEntry) + if _, found := v.incrementedHasUpdatedIndexEntry[key]; !found { + v.incrementedHasUpdatedIndexEntry[key] = keyField{ + offset: offset, + size: size, + } + } else { + continue + } + } + + if len(v.incrementedHasUpdatedIndexEntry) > 0 { + var ( + dst, idx *os.File + ) + if dst, err = os.OpenFile(newDatFileName, os.O_WRONLY, 0644); err != nil { + return + } + defer dst.Close() + + if idx, err = os.OpenFile(newIdxFileName, os.O_WRONLY, 0644); err != nil { + return + } + defer idx.Close() + + idx_entry_bytes := make([]byte, 16) + for key, incre_idx_entry := range v.incrementedHasUpdatedIndexEntry { + util.Uint64toBytes(idx_entry_bytes[0:8], key) + util.Uint32toBytes(idx_entry_bytes[8:12], incre_idx_entry.offset) + util.Uint32toBytes(idx_entry_bytes[12:16], incre_idx_entry.size) + + if _, err := idx.Seek(0, 2); err != nil { + return fmt.Errorf("cannot seek end of indexfile %s: %v", + newIdxFileName, err) + } + _, err = idx.Write(idx_entry_bytes) + + //even the needle cache in memory is hit, the need_bytes is correct + needle_bytes, _, _ := ReadNeedleBlob(dst, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size) + + var offset int64 + if offset, err = dst.Seek(0, 2); err != nil { + glog.V(0).Infof("failed to seek the end of file: %v", err) + return + } + //ensure file writing starting from aligned positions + if offset%NeedlePaddingSize != 0 { + offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize) + if offset, err = v.dataFile.Seek(offset, 0); err != nil { + glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err) + return + } + } + dst.Write(needle_bytes) + } + } + return nil } From ce1f7ab66250cf3ec82e85c458ec273170e7531a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9C=8D=E6=99=93=E6=A0=8B?= Date: Fri, 7 Oct 2016 16:22:24 +0800 Subject: [PATCH 2/4] makediff func with UT case --- weed/storage/volume.go | 9 +-- weed/storage/volume_checking.go | 1 - weed/storage/volume_vacuum.go | 100 ++++++++++++++++++++++------- weed/storage/volume_vacuum_test.go | 53 +++++++++++++++ 4 files changed, 132 insertions(+), 31 deletions(-) create mode 100644 weed/storage/volume_vacuum_test.go diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 258787701..c1d531376 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -10,11 +10,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" ) -type keyField struct { - offset uint32 - size uint32 -} - type Volume struct { Id VolumeId dir string @@ -29,8 +24,8 @@ type Volume struct { dataFileAccessLock sync.Mutex lastModifiedTime uint64 //unix time in seconds - lastCompactingIndexOffset uint64 - incrementedHasUpdatedIndexEntry map[uint64]keyField + lastCompactIndexOffset uint64 + lastCompactRevision uint16 } func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index d424010f1..48f707594 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -21,7 +21,6 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error { return fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e) } key, offset, size := idxFileEntry(lastIdxEntry) - //deleted index entry could not point to deleted needle if offset == 0 { return nil } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 55c248894..723300557 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -21,8 +21,9 @@ func (v *Volume) Compact() error { //glog.V(3).Infof("Got Compaction lock...") filePath := v.FileName() - v.lastCompactingIndexOffset = v.nm.IndexFileSize() - glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactingIndexOffset) + v.lastCompactIndexOffset = v.nm.IndexFileSize() + v.lastCompactRevision = v.SuperBlock.CompactRevision + glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx") } @@ -71,6 +72,21 @@ func (v *Volume) commitCompact() error { return nil } +func fetchCompactRevisionFromDatFile(file *os.File) (compactRevision uint16, err error) { + if _, err = file.Seek(0, 0); err != nil { + return 0, fmt.Errorf("cannot seek to the beginning of %s: %v", file.Name(), err) + } + header := make([]byte, SuperBlockSize) + if _, e := file.Read(header); e != nil { + return 0, fmt.Errorf("cannot read file %s 's super block: %v", file.Name(), e) + } + superBlock, err := ParseSuperBlock(header) + if err != nil { + return 0, err + } + return superBlock.CompactRevision, nil +} + func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldIdxFileName string) (err error) { var indexSize int64 @@ -83,56 +99,67 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI if indexSize, err = verifyIndexFileIntegrity(oldIdxFile); err != nil { return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err) } - if indexSize == 0 || uint64(indexSize) <= v.lastCompactingIndexOffset { + if indexSize == 0 || uint64(indexSize) <= v.lastCompactIndexOffset { return nil } - v.incrementedHasUpdatedIndexEntry = make(map[uint64]keyField) - for idx_offset := indexSize; uint64(idx_offset) >= v.lastCompactingIndexOffset; idx_offset -= NeedleIndexSize { + oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatFile) + if err != nil { + return + } + if oldDatCompactRevision != v.lastCompactRevision { + return fmt.Errorf("current old dat file's compact revision %d is not the expected one %d", oldDatCompactRevision, v.lastCompactRevision) + } + + type keyField struct { + offset uint32 + size uint32 + } + incrementedHasUpdatedIndexEntry := make(map[uint64]keyField) + + for idx_offset := indexSize - NeedleIndexSize; uint64(idx_offset) >= v.lastCompactIndexOffset; idx_offset -= NeedleIndexSize { var IdxEntry []byte if IdxEntry, err = readIndexEntryAtOffset(oldIdxFile, idx_offset); err != nil { return fmt.Errorf("readIndexEntry %s at offset %d failed: %v", oldIdxFileName, idx_offset, err) } key, offset, size := idxFileEntry(IdxEntry) - if _, found := v.incrementedHasUpdatedIndexEntry[key]; !found { - v.incrementedHasUpdatedIndexEntry[key] = keyField{ + if _, found := incrementedHasUpdatedIndexEntry[key]; !found { + incrementedHasUpdatedIndexEntry[key] = keyField{ offset: offset, size: size, } - } else { - continue } } - if len(v.incrementedHasUpdatedIndexEntry) > 0 { + if len(incrementedHasUpdatedIndexEntry) > 0 { var ( dst, idx *os.File ) - if dst, err = os.OpenFile(newDatFileName, os.O_WRONLY, 0644); err != nil { + if dst, err = os.OpenFile(newDatFileName, os.O_RDWR, 0644); err != nil { return } defer dst.Close() - if idx, err = os.OpenFile(newIdxFileName, os.O_WRONLY, 0644); err != nil { + if idx, err = os.OpenFile(newIdxFileName, os.O_RDWR, 0644); err != nil { return } defer idx.Close() + var newDatCompactRevision uint16 + newDatCompactRevision, err = fetchCompactRevisionFromDatFile(dst) + if err != nil { + return + } + if oldDatCompactRevision+1 != newDatCompactRevision { + return fmt.Errorf("oldDatFile %s 's compact revision is %d while newDatFile %s 's compact revision is %d", oldDatFileName, oldDatCompactRevision, newDatFileName, newDatCompactRevision) + } + idx_entry_bytes := make([]byte, 16) - for key, incre_idx_entry := range v.incrementedHasUpdatedIndexEntry { + for key, incre_idx_entry := range incrementedHasUpdatedIndexEntry { util.Uint64toBytes(idx_entry_bytes[0:8], key) util.Uint32toBytes(idx_entry_bytes[8:12], incre_idx_entry.offset) util.Uint32toBytes(idx_entry_bytes[12:16], incre_idx_entry.size) - if _, err := idx.Seek(0, 2); err != nil { - return fmt.Errorf("cannot seek end of indexfile %s: %v", - newIdxFileName, err) - } - _, err = idx.Write(idx_entry_bytes) - - //even the needle cache in memory is hit, the need_bytes is correct - needle_bytes, _, _ := ReadNeedleBlob(dst, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size) - var offset int64 if offset, err = dst.Seek(0, 2); err != nil { glog.V(0).Infof("failed to seek the end of file: %v", err) @@ -146,7 +173,34 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI return } } - dst.Write(needle_bytes) + + //updated needle + if incre_idx_entry.offset != 0 && incre_idx_entry.size != 0 { + //even the needle cache in memory is hit, the need_bytes is correct + var needle_bytes []byte + needle_bytes, _, err = ReadNeedleBlob(oldDatFile, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size) + if err != nil { + return + } + dst.Write(needle_bytes) + util.Uint32toBytes(idx_entry_bytes[8:12], uint32(offset/NeedlePaddingSize)) + } else { //deleted needle + //fakeDelNeedle 's default Data field is nil + fakeDelNeedle := new(Needle) + fakeDelNeedle.Id = key + fakeDelNeedle.Cookie = 0x12345678 + _, err = fakeDelNeedle.Append(dst, v.Version()) + if err != nil { + return + } + util.Uint32toBytes(idx_entry_bytes[8:12], uint32(0)) + } + + if _, err := idx.Seek(0, 2); err != nil { + return fmt.Errorf("cannot seek end of indexfile %s: %v", + newIdxFileName, err) + } + _, err = idx.Write(idx_entry_bytes) } } diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go new file mode 100644 index 000000000..02d1a2b56 --- /dev/null +++ b/weed/storage/volume_vacuum_test.go @@ -0,0 +1,53 @@ +package storage + +import ( + "testing" +) + +/* +makediff test steps +1. launch weed server at your local/dev environment, (option +"garbageThreshold" for master and option "max" for volume should be set with specific value which would let +preparing test prerequisite easier ) + a) ./weed master -garbageThreshold=0.99 -mdir=./m + b) ./weed volume -dir=./data -max=1 -mserver=localhost:9333 -port=8080 +2. upload 4 different files, you could call dir/assign to get 4 different fids + a) upload file A with fid a + b) upload file B with fid b + c) upload file C with fid c + d) upload file D with fid d +3. update file A and C + a) modify file A and upload file A with fid a + b) modify file C and upload file C with fid c + c) record the current 1.idx's file size(lastCompactIndexOffset value) +4. Compacting the data file + a) run curl http://localhost:8080/admin/vacuum/compact?volumeId=1 + b) verify the 1.cpd and 1.cpx is created under volume directory +5. update file B and delete file D + a) modify file B and upload file B with fid b + d) delete file B with fid b +6. Now you could run the following UT case, the case should be run successfully +7. Compact commit manually + a) mv 1.cpd 1.dat + b) mv 1.cpx 1.idx +8. Restart Volume Server +9. Now you should get updated file A,B,C +*/ + +func TestMakeDiff(t *testing.T) { + + v := new(Volume) + //lastCompactIndexOffset value is the index file size before step 4 + v.lastCompactIndexOffset = 96 + v.SuperBlock.version = 0x2 + err := v.makeupDiff( + "/yourpath/1.cpd", + "/yourpath/1.cpx", + "/yourpath/1.dat", + "/yourpath/1.idx") + if err != nil { + t.Errorf("makeupDiff err is %v", err) + } else { + t.Log("makeupDiff Succeeded") + } +} From 7d382ba5fec2539821047e81b4f9a8ce20af0331 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9C=8D=E6=99=93=E6=A0=8B?= Date: Fri, 7 Oct 2016 16:34:22 +0800 Subject: [PATCH 3/4] comment UT case --- weed/storage/volume_vacuum_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index 02d1a2b56..8ab59404d 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -34,6 +34,7 @@ preparing test prerequisite easier ) 9. Now you should get updated file A,B,C */ +/* func TestMakeDiff(t *testing.T) { v := new(Volume) @@ -51,3 +52,4 @@ func TestMakeDiff(t *testing.T) { t.Log("makeupDiff Succeeded") } } +*/ From 7d73bbb07399cc504ae2ebdcfdc164dc01295916 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9C=8D=E6=99=93=E6=A0=8B?= Date: Fri, 7 Oct 2016 16:40:51 +0800 Subject: [PATCH 4/4] comment UT case --- weed/storage/volume_vacuum_test.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index 8ab59404d..c2fac6ce8 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -34,22 +34,22 @@ preparing test prerequisite easier ) 9. Now you should get updated file A,B,C */ -/* func TestMakeDiff(t *testing.T) { v := new(Volume) //lastCompactIndexOffset value is the index file size before step 4 v.lastCompactIndexOffset = 96 v.SuperBlock.version = 0x2 - err := v.makeupDiff( - "/yourpath/1.cpd", - "/yourpath/1.cpx", - "/yourpath/1.dat", - "/yourpath/1.idx") - if err != nil { - t.Errorf("makeupDiff err is %v", err) - } else { - t.Log("makeupDiff Succeeded") - } + /* + err := v.makeupDiff( + "/yourpath/1.cpd", + "/yourpath/1.cpx", + "/yourpath/1.dat", + "/yourpath/1.idx") + if err != nil { + t.Errorf("makeupDiff err is %v", err) + } else { + t.Log("makeupDiff Succeeded") + } + */ } -*/