|
|
@ -7,6 +7,7 @@ import ( |
|
|
|
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/glog" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/stats" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/storage/backend" |
|
|
|
idx2 "github.com/chrislusf/seaweedfs/weed/storage/idx" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/storage/needle" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/storage/needle_map" |
|
|
@ -76,10 +77,10 @@ func (v *Volume) CommitCompact() error { |
|
|
|
|
|
|
|
glog.V(3).Infof("Got volume %d committing lock...", v.Id) |
|
|
|
v.nm.Close() |
|
|
|
if err := v.dataFile.Close(); err != nil { |
|
|
|
if err := v.DataBackend.Close(); err != nil { |
|
|
|
glog.V(0).Infof("fail to close volume %d", v.Id) |
|
|
|
} |
|
|
|
v.dataFile = nil |
|
|
|
v.DataBackend = nil |
|
|
|
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec() |
|
|
|
|
|
|
|
var e error |
|
|
@ -131,8 +132,8 @@ func (v *Volume) cleanupCompact() error { |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func fetchCompactRevisionFromDatFile(file *os.File) (compactRevision uint16, err error) { |
|
|
|
superBlock, err := ReadSuperBlock(file) |
|
|
|
func fetchCompactRevisionFromDatFile(datBackend backend.DataStorageBackend) (compactRevision uint16, err error) { |
|
|
|
superBlock, err := ReadSuperBlock(datBackend) |
|
|
|
if err != nil { |
|
|
|
return 0, err |
|
|
|
} |
|
|
@ -146,7 +147,8 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI |
|
|
|
defer oldIdxFile.Close() |
|
|
|
|
|
|
|
oldDatFile, err := os.Open(oldDatFileName) |
|
|
|
defer oldDatFile.Close() |
|
|
|
oldDatBackend := backend.NewDiskFile(oldDatFileName, oldDatFile) |
|
|
|
defer oldDatBackend.Close() |
|
|
|
|
|
|
|
if indexSize, err = verifyIndexFileIntegrity(oldIdxFile); err != nil { |
|
|
|
return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err) |
|
|
@ -155,7 +157,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatFile) |
|
|
|
oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatBackend) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("fetchCompactRevisionFromDatFile src %s failed: %v", oldDatFile.Name(), err) |
|
|
|
} |
|
|
@ -196,7 +198,8 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI |
|
|
|
if dst, err = os.OpenFile(newDatFileName, os.O_RDWR, 0644); err != nil { |
|
|
|
return fmt.Errorf("open dat file %s failed: %v", newDatFileName, err) |
|
|
|
} |
|
|
|
defer dst.Close() |
|
|
|
dstDatBackend := backend.NewDiskFile(newDatFileName, dst) |
|
|
|
defer dstDatBackend.Close() |
|
|
|
|
|
|
|
if idx, err = os.OpenFile(newIdxFileName, os.O_RDWR, 0644); err != nil { |
|
|
|
return fmt.Errorf("open idx file %s failed: %v", newIdxFileName, err) |
|
|
@ -204,7 +207,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI |
|
|
|
defer idx.Close() |
|
|
|
|
|
|
|
var newDatCompactRevision uint16 |
|
|
|
newDatCompactRevision, err = fetchCompactRevisionFromDatFile(dst) |
|
|
|
newDatCompactRevision, err = fetchCompactRevisionFromDatFile(dstDatBackend) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("fetchCompactRevisionFromDatFile dst %s failed: %v", dst.Name(), err) |
|
|
|
} |
|
|
@ -235,7 +238,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI |
|
|
|
//even the needle cache in memory is hit, the need_bytes is correct
|
|
|
|
glog.V(4).Infof("file %d offset %d size %d", key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size) |
|
|
|
var needleBytes []byte |
|
|
|
needleBytes, err = needle.ReadNeedleBlob(oldDatFile, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, v.Version()) |
|
|
|
needleBytes, err = needle.ReadNeedleBlob(oldDatBackend, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, v.Version()) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, err) |
|
|
|
} |
|
|
@ -247,7 +250,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI |
|
|
|
fakeDelNeedle.Id = key |
|
|
|
fakeDelNeedle.Cookie = 0x12345678 |
|
|
|
fakeDelNeedle.AppendAtNs = uint64(time.Now().UnixNano()) |
|
|
|
_, _, _, err = fakeDelNeedle.Append(dst, v.Version()) |
|
|
|
_, _, _, err = fakeDelNeedle.Append(dstDatBackend, v.Version()) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("append deleted %d failed: %v", key, err) |
|
|
|
} |
|
|
@ -267,7 +270,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI |
|
|
|
type VolumeFileScanner4Vacuum struct { |
|
|
|
version needle.Version |
|
|
|
v *Volume |
|
|
|
dst *os.File |
|
|
|
dstBackend backend.DataStorageBackend |
|
|
|
nm *NeedleMap |
|
|
|
newOffset int64 |
|
|
|
now uint64 |
|
|
@ -277,7 +280,7 @@ type VolumeFileScanner4Vacuum struct { |
|
|
|
func (scanner *VolumeFileScanner4Vacuum) VisitSuperBlock(superBlock SuperBlock) error { |
|
|
|
scanner.version = superBlock.Version() |
|
|
|
superBlock.CompactionRevision++ |
|
|
|
_, err := scanner.dst.Write(superBlock.Bytes()) |
|
|
|
_, err := scanner.dstBackend.WriteAt(superBlock.Bytes(), 0) |
|
|
|
scanner.newOffset = int64(superBlock.BlockSize()) |
|
|
|
return err |
|
|
|
|
|
|
@ -296,7 +299,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in |
|
|
|
if err := scanner.nm.Put(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil { |
|
|
|
return fmt.Errorf("cannot put needle: %s", err) |
|
|
|
} |
|
|
|
if _, _, _, err := n.Append(scanner.dst, scanner.v.Version()); err != nil { |
|
|
|
if _, _, _, err := n.Append(scanner.dstBackend, scanner.v.Version()); err != nil { |
|
|
|
return fmt.Errorf("cannot append needle: %s", err) |
|
|
|
} |
|
|
|
delta := n.DiskSize(scanner.version) |
|
|
@ -325,7 +328,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca |
|
|
|
v: v, |
|
|
|
now: uint64(time.Now().Unix()), |
|
|
|
nm: NewBtreeNeedleMap(idx), |
|
|
|
dst: dst, |
|
|
|
dstBackend: backend.NewDiskFile(dstName, dst), |
|
|
|
writeThrottler: util.NewWriteThrottler(compactionBytePerSecond), |
|
|
|
} |
|
|
|
err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner) |
|
|
@ -339,7 +342,8 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { |
|
|
|
if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil { |
|
|
|
return |
|
|
|
} |
|
|
|
defer dst.Close() |
|
|
|
dstDatBackend := backend.NewDiskFile(dstName, dst) |
|
|
|
defer dstDatBackend.Close() |
|
|
|
|
|
|
|
if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil { |
|
|
|
return |
|
|
@ -369,7 +373,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { |
|
|
|
} |
|
|
|
|
|
|
|
n := new(needle.Needle) |
|
|
|
err := n.ReadData(v.dataFile, offset.ToAcutalOffset(), size, v.Version()) |
|
|
|
err := n.ReadData(v.DataBackend, offset.ToAcutalOffset(), size, v.Version()) |
|
|
|
if err != nil { |
|
|
|
return nil |
|
|
|
} |
|
|
@ -383,7 +387,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { |
|
|
|
if err = nm.Put(n.Id, ToOffset(newOffset), n.Size); err != nil { |
|
|
|
return fmt.Errorf("cannot put needle: %s", err) |
|
|
|
} |
|
|
|
if _, _, _, err = n.Append(dst, v.Version()); err != nil { |
|
|
|
if _, _, _, err = n.Append(dstDatBackend, v.Version()); err != nil { |
|
|
|
return fmt.Errorf("cannot append needle: %s", err) |
|
|
|
} |
|
|
|
newOffset += n.DiskSize(v.Version()) |
|
|
|