package erasure_coding import ( "fmt" "io" "os" "github.com/seaweedfs/seaweedfs/weed/storage/backend" "github.com/seaweedfs/seaweedfs/weed/storage/idx" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" ) // write .idx file from .ecx and .ecj files func WriteIdxFileFromEcIndex(baseFileName string) (err error) { return WriteIdxFileFromEcIndexToTarget(baseFileName, baseFileName) } // WriteIdxFileFromEcIndexToTarget writes .idx file from .ecx and .ecj files with separate source and target func WriteIdxFileFromEcIndexToTarget(sourceBaseName, targetBaseName string) (err error) { ecxFile, openErr := os.OpenFile(sourceBaseName+".ecx", os.O_RDONLY, 0644) if openErr != nil { return fmt.Errorf("cannot open ec index %s.ecx: %v", sourceBaseName, openErr) } defer ecxFile.Close() idxFile, openErr := os.OpenFile(targetBaseName+".idx", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) if openErr != nil { return fmt.Errorf("cannot open %s.idx: %v", targetBaseName, openErr) } defer idxFile.Close() io.Copy(idxFile, ecxFile) err = iterateEcjFile(sourceBaseName, func(key types.NeedleId) error { bytes := needle_map.ToBytes(key, types.Offset{}, types.TombstoneFileSize) idxFile.Write(bytes) return nil }) return err } // FindDatFileSize calculate .dat file size from max offset entry // there may be extra deletions after that entry // but they are deletions anyway func FindDatFileSize(dataBaseFileName, indexBaseFileName string) (datSize int64, err error) { version, err := readEcVolumeVersion(dataBaseFileName) if err != nil { return 0, fmt.Errorf("read ec volume %s version: %v", dataBaseFileName, err) } err = iterateEcxFile(indexBaseFileName, func(key types.NeedleId, offset types.Offset, size types.Size) error { if size.IsDeleted() { return nil } entryStopOffset := offset.ToActualOffset() + needle.GetActualSize(size, version) if datSize < entryStopOffset { datSize = entryStopOffset } return nil }) return } func readEcVolumeVersion(baseFileName string) (version needle.Version, err error) { // find volume version datFile, err := os.OpenFile(baseFileName+".ec00", os.O_RDONLY, 0644) if err != nil { return 0, fmt.Errorf("open ec volume %s superblock: %v", baseFileName, err) } datBackend := backend.NewDiskFile(datFile) superBlock, err := super_block.ReadSuperBlock(datBackend) datBackend.Close() if err != nil { return 0, fmt.Errorf("read ec volume %s superblock: %v", baseFileName, err) } return superBlock.Version, nil } func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId, offset types.Offset, size types.Size) error) error { ecxFile, openErr := os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644) if openErr != nil { return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr) } defer ecxFile.Close() buf := make([]byte, types.NeedleMapEntrySize) for { n, err := ecxFile.Read(buf) if n != types.NeedleMapEntrySize { if err == io.EOF { return nil } return err } key, offset, size := idx.IdxFileEntry(buf) if processNeedleFn != nil { err = processNeedleFn(key, offset, size) } if err != nil { if err != io.EOF { return err } return nil } } } func iterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId) error) error { if !util.FileExists(baseFileName + ".ecj") { return nil } ecjFile, openErr := os.OpenFile(baseFileName+".ecj", os.O_RDONLY, 0644) if openErr != nil { return fmt.Errorf("cannot open ec index %s.ecj: %v", baseFileName, openErr) } defer ecjFile.Close() buf := make([]byte, types.NeedleIdSize) for { n, err := ecjFile.Read(buf) if n != types.NeedleIdSize { if err == io.EOF { return nil } return err } if processNeedleFn != nil { err = processNeedleFn(types.BytesToNeedleId(buf)) } if err != nil { if err == io.EOF { return nil } return err } } } // IterateEcjFile iterates through deleted needle IDs in an EC journal file // This is the public interface for reading .ecj files func IterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId) error) error { return iterateEcjFile(baseFileName, processNeedleFn) } // WriteDatFile generates .dat from .ec00 ~ .ec09 files func WriteDatFile(baseFileName string, datFileSize int64, shardFileNames []string) error { datFile, openErr := os.OpenFile(baseFileName+".dat", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) if openErr != nil { return fmt.Errorf("cannot write volume %s.dat: %v", baseFileName, openErr) } defer datFile.Close() inputFiles := make([]*os.File, DataShardsCount) defer func() { for shardId := 0; shardId < DataShardsCount; shardId++ { if inputFiles[shardId] != nil { inputFiles[shardId].Close() } } }() for shardId := 0; shardId < DataShardsCount; shardId++ { inputFiles[shardId], openErr = os.OpenFile(shardFileNames[shardId], os.O_RDONLY, 0) if openErr != nil { return openErr } } for datFileSize >= DataShardsCount*ErasureCodingLargeBlockSize { for shardId := 0; shardId < DataShardsCount; shardId++ { w, err := io.CopyN(datFile, inputFiles[shardId], ErasureCodingLargeBlockSize) if w != ErasureCodingLargeBlockSize { return fmt.Errorf("copy %s large block on shardId %d: %v", baseFileName, shardId, err) } datFileSize -= ErasureCodingLargeBlockSize } } for datFileSize > 0 { for shardId := 0; shardId < DataShardsCount; shardId++ { toRead := min(datFileSize, ErasureCodingSmallBlockSize) w, err := io.CopyN(datFile, inputFiles[shardId], toRead) if w != toRead { return fmt.Errorf("copy %s small block %d: %v", baseFileName, shardId, err) } datFileSize -= toRead } } return nil } func min(x, y int64) int64 { if x > y { return y } return x } // WriteDatFileAndVacuum reconstructs volume from EC shards and then vacuums deleted needles // This reuses existing WriteDatFile and volume compaction logic to achieve the same result more cleanly // Creates cleaned volume files (without generation) that are ready for generational EC encoding func WriteDatFileAndVacuum(baseFileName string, shardFileNames []string) error { // Step 1: Use existing WriteDatFile to reconstruct the full volume datFileSize, err := FindDatFileSize(baseFileName, baseFileName) if err != nil { return fmt.Errorf("failed to find dat file size: %w", err) } tempDatFile := baseFileName + ".tmp.dat" tempBaseName := baseFileName + ".tmp" // WriteDatFile expects base name without .dat extension err = WriteDatFile(tempBaseName, datFileSize, shardFileNames) if err != nil { return fmt.Errorf("failed to reconstruct volume with WriteDatFile: %w", err) } defer os.Remove(tempDatFile) // cleanup temp file // Step 2: Create index file with deleted entries marked (use actual .ecx/.ecj files directly) tempIdxFile := baseFileName + ".tmp.idx" err = WriteIdxFileFromEcIndexToTarget(baseFileName, tempBaseName) // Read from actual files, create temp idx if err != nil { return fmt.Errorf("failed to create index file: %w", err) } defer os.Remove(tempIdxFile) // cleanup temp file // Step 3: Use existing volume compaction logic to filter out deleted needles version, err := readEcVolumeVersion(baseFileName) if err != nil { return fmt.Errorf("failed to read volume version: %w", err) } // Create cleaned volume files (without generation suffix) // These will later be copied to generation-aware names by encodeVolumeToEcShards() return copyDataBasedOnIndexFileForEcVacuum( tempDatFile, tempIdxFile, // source files (with deleted entries) baseFileName+".dat", baseFileName+".idx", // destination files (cleaned, ready for generational encoding) version, ) } // copyDataBasedOnIndexFileForEcVacuum copies only non-deleted needles from source to destination // This is a simplified version of volume_vacuum.go's copyDataBasedOnIndexFile for EC vacuum use func copyDataBasedOnIndexFileForEcVacuum(srcDatName, srcIdxName, dstDatName, dstIdxName string, version needle.Version) error { // Open source data file dataFile, err := os.Open(srcDatName) if err != nil { return fmt.Errorf("failed to open source dat file: %w", err) } srcDatBackend := backend.NewDiskFile(dataFile) defer srcDatBackend.Close() // Create destination data file dstDatBackend, err := backend.CreateVolumeFile(dstDatName, 0, 0) if err != nil { return fmt.Errorf("failed to create destination dat file: %w", err) } defer func() { dstDatBackend.Sync() dstDatBackend.Close() }() // Load needle map from source index oldNm := needle_map.NewMemDb() defer oldNm.Close() if err := oldNm.LoadFromIdx(srcIdxName); err != nil { return fmt.Errorf("failed to load index file: %w", err) } // Create new needle map for cleaned volume newNm := needle_map.NewMemDb() defer newNm.Close() // Copy superblock with incremented compaction revision sb := super_block.SuperBlock{} if existingSb, err := super_block.ReadSuperBlock(srcDatBackend); err == nil { sb = existingSb sb.CompactionRevision++ } else { // Use default superblock if reading fails sb = super_block.SuperBlock{ Version: version, ReplicaPlacement: &super_block.ReplicaPlacement{}, CompactionRevision: 1, } } dstDatBackend.WriteAt(sb.Bytes(), 0) newOffset := int64(sb.BlockSize()) // Copy only non-deleted needles err = oldNm.AscendingVisit(func(value needle_map.NeedleValue) error { offset, size := value.Offset, value.Size // Skip deleted needles (this is the key filtering logic!) if offset.IsZero() || size.IsDeleted() { return nil } // Read needle from source n := new(needle.Needle) if err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, version); err != nil { return fmt.Errorf("cannot read needle from source: %w", err) } // Write needle to destination if err := newNm.Set(n.Id, types.ToOffset(newOffset), n.Size); err != nil { return fmt.Errorf("cannot set needle in new map: %w", err) } if _, _, _, err := n.Append(dstDatBackend, sb.Version); err != nil { return fmt.Errorf("cannot append needle to destination: %w", err) } newOffset += n.DiskSize(version) return nil }) if err != nil { return fmt.Errorf("failed to copy needles: %w", err) } // Save the new index file return newNm.SaveToIdx(dstIdxName) }