You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
343 lines
10 KiB
343 lines
10 KiB
package erasure_coding
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
)
|
|
|
|
// write .idx file from .ecx and .ecj files
|
|
func WriteIdxFileFromEcIndex(baseFileName string) (err error) {
|
|
return WriteIdxFileFromEcIndexToTarget(baseFileName, baseFileName)
|
|
}
|
|
|
|
// WriteIdxFileFromEcIndexToTarget writes .idx file from .ecx and .ecj files with separate source and target
|
|
func WriteIdxFileFromEcIndexToTarget(sourceBaseName, targetBaseName string) (err error) {
|
|
ecxFile, openErr := os.OpenFile(sourceBaseName+".ecx", os.O_RDONLY, 0644)
|
|
if openErr != nil {
|
|
return fmt.Errorf("cannot open ec index %s.ecx: %v", sourceBaseName, openErr)
|
|
}
|
|
defer ecxFile.Close()
|
|
|
|
idxFile, openErr := os.OpenFile(targetBaseName+".idx", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
|
|
if openErr != nil {
|
|
return fmt.Errorf("cannot open %s.idx: %v", targetBaseName, openErr)
|
|
}
|
|
defer idxFile.Close()
|
|
|
|
io.Copy(idxFile, ecxFile)
|
|
|
|
err = iterateEcjFile(sourceBaseName, func(key types.NeedleId) error {
|
|
|
|
bytes := needle_map.ToBytes(key, types.Offset{}, types.TombstoneFileSize)
|
|
idxFile.Write(bytes)
|
|
|
|
return nil
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
// FindDatFileSize calculate .dat file size from max offset entry
|
|
// there may be extra deletions after that entry
|
|
// but they are deletions anyway
|
|
func FindDatFileSize(dataBaseFileName, indexBaseFileName string) (datSize int64, err error) {
|
|
|
|
version, err := readEcVolumeVersion(dataBaseFileName)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("read ec volume %s version: %v", dataBaseFileName, err)
|
|
}
|
|
|
|
err = iterateEcxFile(indexBaseFileName, func(key types.NeedleId, offset types.Offset, size types.Size) error {
|
|
|
|
if size.IsDeleted() {
|
|
return nil
|
|
}
|
|
|
|
entryStopOffset := offset.ToActualOffset() + needle.GetActualSize(size, version)
|
|
if datSize < entryStopOffset {
|
|
datSize = entryStopOffset
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
return
|
|
}
|
|
|
|
func readEcVolumeVersion(baseFileName string) (version needle.Version, err error) {
|
|
|
|
// find volume version
|
|
datFile, err := os.OpenFile(baseFileName+".ec00", os.O_RDONLY, 0644)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("open ec volume %s superblock: %v", baseFileName, err)
|
|
}
|
|
datBackend := backend.NewDiskFile(datFile)
|
|
|
|
superBlock, err := super_block.ReadSuperBlock(datBackend)
|
|
datBackend.Close()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("read ec volume %s superblock: %v", baseFileName, err)
|
|
}
|
|
|
|
return superBlock.Version, nil
|
|
|
|
}
|
|
|
|
func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId, offset types.Offset, size types.Size) error) error {
|
|
ecxFile, openErr := os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644)
|
|
if openErr != nil {
|
|
return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
|
|
}
|
|
defer ecxFile.Close()
|
|
|
|
buf := make([]byte, types.NeedleMapEntrySize)
|
|
for {
|
|
n, err := ecxFile.Read(buf)
|
|
if n != types.NeedleMapEntrySize {
|
|
if err == io.EOF {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
key, offset, size := idx.IdxFileEntry(buf)
|
|
if processNeedleFn != nil {
|
|
err = processNeedleFn(key, offset, size)
|
|
}
|
|
if err != nil {
|
|
if err != io.EOF {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
func iterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId) error) error {
|
|
if !util.FileExists(baseFileName + ".ecj") {
|
|
return nil
|
|
}
|
|
ecjFile, openErr := os.OpenFile(baseFileName+".ecj", os.O_RDONLY, 0644)
|
|
if openErr != nil {
|
|
return fmt.Errorf("cannot open ec index %s.ecj: %v", baseFileName, openErr)
|
|
}
|
|
defer ecjFile.Close()
|
|
|
|
buf := make([]byte, types.NeedleIdSize)
|
|
for {
|
|
n, err := ecjFile.Read(buf)
|
|
if n != types.NeedleIdSize {
|
|
if err == io.EOF {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
if processNeedleFn != nil {
|
|
err = processNeedleFn(types.BytesToNeedleId(buf))
|
|
}
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
// IterateEcjFile iterates through deleted needle IDs in an EC journal file
|
|
// This is the public interface for reading .ecj files
|
|
func IterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId) error) error {
|
|
return iterateEcjFile(baseFileName, processNeedleFn)
|
|
}
|
|
|
|
// WriteDatFile generates .dat from .ec00 ~ .ec09 files
|
|
func WriteDatFile(baseFileName string, datFileSize int64, shardFileNames []string) error {
|
|
|
|
datFile, openErr := os.OpenFile(baseFileName+".dat", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
|
|
if openErr != nil {
|
|
return fmt.Errorf("cannot write volume %s.dat: %v", baseFileName, openErr)
|
|
}
|
|
defer datFile.Close()
|
|
|
|
inputFiles := make([]*os.File, DataShardsCount)
|
|
|
|
defer func() {
|
|
for shardId := 0; shardId < DataShardsCount; shardId++ {
|
|
if inputFiles[shardId] != nil {
|
|
inputFiles[shardId].Close()
|
|
}
|
|
}
|
|
}()
|
|
|
|
for shardId := 0; shardId < DataShardsCount; shardId++ {
|
|
inputFiles[shardId], openErr = os.OpenFile(shardFileNames[shardId], os.O_RDONLY, 0)
|
|
if openErr != nil {
|
|
return openErr
|
|
}
|
|
}
|
|
|
|
for datFileSize >= DataShardsCount*ErasureCodingLargeBlockSize {
|
|
for shardId := 0; shardId < DataShardsCount; shardId++ {
|
|
w, err := io.CopyN(datFile, inputFiles[shardId], ErasureCodingLargeBlockSize)
|
|
if w != ErasureCodingLargeBlockSize {
|
|
return fmt.Errorf("copy %s large block on shardId %d: %v", baseFileName, shardId, err)
|
|
}
|
|
datFileSize -= ErasureCodingLargeBlockSize
|
|
}
|
|
}
|
|
|
|
for datFileSize > 0 {
|
|
for shardId := 0; shardId < DataShardsCount; shardId++ {
|
|
toRead := min(datFileSize, ErasureCodingSmallBlockSize)
|
|
w, err := io.CopyN(datFile, inputFiles[shardId], toRead)
|
|
if w != toRead {
|
|
return fmt.Errorf("copy %s small block %d: %v", baseFileName, shardId, err)
|
|
}
|
|
datFileSize -= toRead
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func min(x, y int64) int64 {
|
|
if x > y {
|
|
return y
|
|
}
|
|
return x
|
|
}
|
|
|
|
// WriteDatFileAndVacuum reconstructs volume from EC shards and then vacuums deleted needles
|
|
// This reuses existing WriteDatFile and volume compaction logic to achieve the same result more cleanly
|
|
// Creates cleaned volume files (without generation) that are ready for generational EC encoding
|
|
func WriteDatFileAndVacuum(baseFileName string, shardFileNames []string) error {
|
|
// Step 1: Use existing WriteDatFile to reconstruct the full volume
|
|
datFileSize, err := FindDatFileSize(baseFileName, baseFileName)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to find dat file size: %w", err)
|
|
}
|
|
|
|
tempDatFile := baseFileName + ".tmp.dat"
|
|
tempBaseName := baseFileName + ".tmp" // WriteDatFile expects base name without .dat extension
|
|
err = WriteDatFile(tempBaseName, datFileSize, shardFileNames)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to reconstruct volume with WriteDatFile: %w", err)
|
|
}
|
|
defer os.Remove(tempDatFile) // cleanup temp file
|
|
|
|
// Step 2: Create index file with deleted entries marked (use actual .ecx/.ecj files directly)
|
|
tempIdxFile := baseFileName + ".tmp.idx"
|
|
err = WriteIdxFileFromEcIndexToTarget(baseFileName, tempBaseName) // Read from actual files, create temp idx
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create index file: %w", err)
|
|
}
|
|
defer os.Remove(tempIdxFile) // cleanup temp file
|
|
|
|
// Step 3: Use existing volume compaction logic to filter out deleted needles
|
|
version, err := readEcVolumeVersion(baseFileName)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read volume version: %w", err)
|
|
}
|
|
|
|
// Create cleaned volume files (without generation suffix)
|
|
// These will later be copied to generation-aware names by encodeVolumeToEcShards()
|
|
return copyDataBasedOnIndexFileForEcVacuum(
|
|
tempDatFile, tempIdxFile, // source files (with deleted entries)
|
|
baseFileName+".dat", baseFileName+".idx", // destination files (cleaned, ready for generational encoding)
|
|
version,
|
|
)
|
|
}
|
|
|
|
// copyDataBasedOnIndexFileForEcVacuum copies only non-deleted needles from source to destination
|
|
// This is a simplified version of volume_vacuum.go's copyDataBasedOnIndexFile for EC vacuum use
|
|
func copyDataBasedOnIndexFileForEcVacuum(srcDatName, srcIdxName, dstDatName, dstIdxName string, version needle.Version) error {
|
|
// Open source data file
|
|
dataFile, err := os.Open(srcDatName)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open source dat file: %w", err)
|
|
}
|
|
srcDatBackend := backend.NewDiskFile(dataFile)
|
|
defer srcDatBackend.Close()
|
|
|
|
// Create destination data file
|
|
dstDatBackend, err := backend.CreateVolumeFile(dstDatName, 0, 0)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create destination dat file: %w", err)
|
|
}
|
|
defer func() {
|
|
dstDatBackend.Sync()
|
|
dstDatBackend.Close()
|
|
}()
|
|
|
|
// Load needle map from source index
|
|
oldNm := needle_map.NewMemDb()
|
|
defer oldNm.Close()
|
|
if err := oldNm.LoadFromIdx(srcIdxName); err != nil {
|
|
return fmt.Errorf("failed to load index file: %w", err)
|
|
}
|
|
|
|
// Create new needle map for cleaned volume
|
|
newNm := needle_map.NewMemDb()
|
|
defer newNm.Close()
|
|
|
|
// Copy superblock with incremented compaction revision
|
|
sb := super_block.SuperBlock{}
|
|
if existingSb, err := super_block.ReadSuperBlock(srcDatBackend); err == nil {
|
|
sb = existingSb
|
|
sb.CompactionRevision++
|
|
} else {
|
|
// Use default superblock if reading fails
|
|
sb = super_block.SuperBlock{
|
|
Version: version,
|
|
ReplicaPlacement: &super_block.ReplicaPlacement{},
|
|
CompactionRevision: 1,
|
|
}
|
|
}
|
|
|
|
dstDatBackend.WriteAt(sb.Bytes(), 0)
|
|
newOffset := int64(sb.BlockSize())
|
|
|
|
// Copy only non-deleted needles
|
|
err = oldNm.AscendingVisit(func(value needle_map.NeedleValue) error {
|
|
offset, size := value.Offset, value.Size
|
|
|
|
// Skip deleted needles (this is the key filtering logic!)
|
|
if offset.IsZero() || size.IsDeleted() {
|
|
return nil
|
|
}
|
|
|
|
// Read needle from source
|
|
n := new(needle.Needle)
|
|
if err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, version); err != nil {
|
|
return fmt.Errorf("cannot read needle from source: %w", err)
|
|
}
|
|
|
|
// Write needle to destination
|
|
if err := newNm.Set(n.Id, types.ToOffset(newOffset), n.Size); err != nil {
|
|
return fmt.Errorf("cannot set needle in new map: %w", err)
|
|
}
|
|
if _, _, _, err := n.Append(dstDatBackend, sb.Version); err != nil {
|
|
return fmt.Errorf("cannot append needle to destination: %w", err)
|
|
}
|
|
|
|
newOffset += n.DiskSize(version)
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to copy needles: %w", err)
|
|
}
|
|
|
|
// Save the new index file
|
|
return newNm.SaveToIdx(dstIdxName)
|
|
}
|