You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

343 lines
10 KiB

package erasure_coding
import (
"fmt"
"io"
"os"
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// write .idx file from .ecx and .ecj files
func WriteIdxFileFromEcIndex(baseFileName string) (err error) {
return WriteIdxFileFromEcIndexToTarget(baseFileName, baseFileName)
}
// WriteIdxFileFromEcIndexToTarget writes .idx file from .ecx and .ecj files with separate source and target
func WriteIdxFileFromEcIndexToTarget(sourceBaseName, targetBaseName string) (err error) {
ecxFile, openErr := os.OpenFile(sourceBaseName+".ecx", os.O_RDONLY, 0644)
if openErr != nil {
return fmt.Errorf("cannot open ec index %s.ecx: %v", sourceBaseName, openErr)
}
defer ecxFile.Close()
idxFile, openErr := os.OpenFile(targetBaseName+".idx", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if openErr != nil {
return fmt.Errorf("cannot open %s.idx: %v", targetBaseName, openErr)
}
defer idxFile.Close()
io.Copy(idxFile, ecxFile)
err = iterateEcjFile(sourceBaseName, func(key types.NeedleId) error {
bytes := needle_map.ToBytes(key, types.Offset{}, types.TombstoneFileSize)
idxFile.Write(bytes)
return nil
})
return err
}
// FindDatFileSize calculate .dat file size from max offset entry
// there may be extra deletions after that entry
// but they are deletions anyway
func FindDatFileSize(dataBaseFileName, indexBaseFileName string) (datSize int64, err error) {
version, err := readEcVolumeVersion(dataBaseFileName)
if err != nil {
return 0, fmt.Errorf("read ec volume %s version: %v", dataBaseFileName, err)
}
err = iterateEcxFile(indexBaseFileName, func(key types.NeedleId, offset types.Offset, size types.Size) error {
if size.IsDeleted() {
return nil
}
entryStopOffset := offset.ToActualOffset() + needle.GetActualSize(size, version)
if datSize < entryStopOffset {
datSize = entryStopOffset
}
return nil
})
return
}
func readEcVolumeVersion(baseFileName string) (version needle.Version, err error) {
// find volume version
datFile, err := os.OpenFile(baseFileName+".ec00", os.O_RDONLY, 0644)
if err != nil {
return 0, fmt.Errorf("open ec volume %s superblock: %v", baseFileName, err)
}
datBackend := backend.NewDiskFile(datFile)
superBlock, err := super_block.ReadSuperBlock(datBackend)
datBackend.Close()
if err != nil {
return 0, fmt.Errorf("read ec volume %s superblock: %v", baseFileName, err)
}
return superBlock.Version, nil
}
func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId, offset types.Offset, size types.Size) error) error {
ecxFile, openErr := os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644)
if openErr != nil {
return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
}
defer ecxFile.Close()
buf := make([]byte, types.NeedleMapEntrySize)
for {
n, err := ecxFile.Read(buf)
if n != types.NeedleMapEntrySize {
if err == io.EOF {
return nil
}
return err
}
key, offset, size := idx.IdxFileEntry(buf)
if processNeedleFn != nil {
err = processNeedleFn(key, offset, size)
}
if err != nil {
if err != io.EOF {
return err
}
return nil
}
}
}
func iterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId) error) error {
if !util.FileExists(baseFileName + ".ecj") {
return nil
}
ecjFile, openErr := os.OpenFile(baseFileName+".ecj", os.O_RDONLY, 0644)
if openErr != nil {
return fmt.Errorf("cannot open ec index %s.ecj: %v", baseFileName, openErr)
}
defer ecjFile.Close()
buf := make([]byte, types.NeedleIdSize)
for {
n, err := ecjFile.Read(buf)
if n != types.NeedleIdSize {
if err == io.EOF {
return nil
}
return err
}
if processNeedleFn != nil {
err = processNeedleFn(types.BytesToNeedleId(buf))
}
if err != nil {
if err == io.EOF {
return nil
}
return err
}
}
}
// IterateEcjFile iterates through deleted needle IDs in an EC journal file
// This is the public interface for reading .ecj files
func IterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId) error) error {
return iterateEcjFile(baseFileName, processNeedleFn)
}
// WriteDatFile generates .dat from .ec00 ~ .ec09 files
func WriteDatFile(baseFileName string, datFileSize int64, shardFileNames []string) error {
datFile, openErr := os.OpenFile(baseFileName+".dat", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if openErr != nil {
return fmt.Errorf("cannot write volume %s.dat: %v", baseFileName, openErr)
}
defer datFile.Close()
inputFiles := make([]*os.File, DataShardsCount)
defer func() {
for shardId := 0; shardId < DataShardsCount; shardId++ {
if inputFiles[shardId] != nil {
inputFiles[shardId].Close()
}
}
}()
for shardId := 0; shardId < DataShardsCount; shardId++ {
inputFiles[shardId], openErr = os.OpenFile(shardFileNames[shardId], os.O_RDONLY, 0)
if openErr != nil {
return openErr
}
}
for datFileSize >= DataShardsCount*ErasureCodingLargeBlockSize {
for shardId := 0; shardId < DataShardsCount; shardId++ {
w, err := io.CopyN(datFile, inputFiles[shardId], ErasureCodingLargeBlockSize)
if w != ErasureCodingLargeBlockSize {
return fmt.Errorf("copy %s large block on shardId %d: %v", baseFileName, shardId, err)
}
datFileSize -= ErasureCodingLargeBlockSize
}
}
for datFileSize > 0 {
for shardId := 0; shardId < DataShardsCount; shardId++ {
toRead := min(datFileSize, ErasureCodingSmallBlockSize)
w, err := io.CopyN(datFile, inputFiles[shardId], toRead)
if w != toRead {
return fmt.Errorf("copy %s small block %d: %v", baseFileName, shardId, err)
}
datFileSize -= toRead
}
}
return nil
}
func min(x, y int64) int64 {
if x > y {
return y
}
return x
}
// WriteDatFileAndVacuum reconstructs volume from EC shards and then vacuums deleted needles
// This reuses existing WriteDatFile and volume compaction logic to achieve the same result more cleanly
// Creates cleaned volume files (without generation) that are ready for generational EC encoding
func WriteDatFileAndVacuum(baseFileName string, shardFileNames []string) error {
// Step 1: Use existing WriteDatFile to reconstruct the full volume
datFileSize, err := FindDatFileSize(baseFileName, baseFileName)
if err != nil {
return fmt.Errorf("failed to find dat file size: %w", err)
}
tempDatFile := baseFileName + ".tmp.dat"
tempBaseName := baseFileName + ".tmp" // WriteDatFile expects base name without .dat extension
err = WriteDatFile(tempBaseName, datFileSize, shardFileNames)
if err != nil {
return fmt.Errorf("failed to reconstruct volume with WriteDatFile: %w", err)
}
defer os.Remove(tempDatFile) // cleanup temp file
// Step 2: Create index file with deleted entries marked (use actual .ecx/.ecj files directly)
tempIdxFile := baseFileName + ".tmp.idx"
err = WriteIdxFileFromEcIndexToTarget(baseFileName, tempBaseName) // Read from actual files, create temp idx
if err != nil {
return fmt.Errorf("failed to create index file: %w", err)
}
defer os.Remove(tempIdxFile) // cleanup temp file
// Step 3: Use existing volume compaction logic to filter out deleted needles
version, err := readEcVolumeVersion(baseFileName)
if err != nil {
return fmt.Errorf("failed to read volume version: %w", err)
}
// Create cleaned volume files (without generation suffix)
// These will later be copied to generation-aware names by encodeVolumeToEcShards()
return copyDataBasedOnIndexFileForEcVacuum(
tempDatFile, tempIdxFile, // source files (with deleted entries)
baseFileName+".dat", baseFileName+".idx", // destination files (cleaned, ready for generational encoding)
version,
)
}
// copyDataBasedOnIndexFileForEcVacuum copies only non-deleted needles from source to destination
// This is a simplified version of volume_vacuum.go's copyDataBasedOnIndexFile for EC vacuum use
func copyDataBasedOnIndexFileForEcVacuum(srcDatName, srcIdxName, dstDatName, dstIdxName string, version needle.Version) error {
// Open source data file
dataFile, err := os.Open(srcDatName)
if err != nil {
return fmt.Errorf("failed to open source dat file: %w", err)
}
srcDatBackend := backend.NewDiskFile(dataFile)
defer srcDatBackend.Close()
// Create destination data file
dstDatBackend, err := backend.CreateVolumeFile(dstDatName, 0, 0)
if err != nil {
return fmt.Errorf("failed to create destination dat file: %w", err)
}
defer func() {
dstDatBackend.Sync()
dstDatBackend.Close()
}()
// Load needle map from source index
oldNm := needle_map.NewMemDb()
defer oldNm.Close()
if err := oldNm.LoadFromIdx(srcIdxName); err != nil {
return fmt.Errorf("failed to load index file: %w", err)
}
// Create new needle map for cleaned volume
newNm := needle_map.NewMemDb()
defer newNm.Close()
// Copy superblock with incremented compaction revision
sb := super_block.SuperBlock{}
if existingSb, err := super_block.ReadSuperBlock(srcDatBackend); err == nil {
sb = existingSb
sb.CompactionRevision++
} else {
// Use default superblock if reading fails
sb = super_block.SuperBlock{
Version: version,
ReplicaPlacement: &super_block.ReplicaPlacement{},
CompactionRevision: 1,
}
}
dstDatBackend.WriteAt(sb.Bytes(), 0)
newOffset := int64(sb.BlockSize())
// Copy only non-deleted needles
err = oldNm.AscendingVisit(func(value needle_map.NeedleValue) error {
offset, size := value.Offset, value.Size
// Skip deleted needles (this is the key filtering logic!)
if offset.IsZero() || size.IsDeleted() {
return nil
}
// Read needle from source
n := new(needle.Needle)
if err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, version); err != nil {
return fmt.Errorf("cannot read needle from source: %w", err)
}
// Write needle to destination
if err := newNm.Set(n.Id, types.ToOffset(newOffset), n.Size); err != nil {
return fmt.Errorf("cannot set needle in new map: %w", err)
}
if _, _, _, err := n.Append(dstDatBackend, sb.Version); err != nil {
return fmt.Errorf("cannot append needle to destination: %w", err)
}
newOffset += n.DiskSize(version)
return nil
})
if err != nil {
return fmt.Errorf("failed to copy needles: %w", err)
}
// Save the new index file
return newNm.SaveToIdx(dstIdxName)
}