diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index aac824318..e8c1d10e4 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -144,10 +144,26 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne return false } - // skip if ec volumes exists + // parse out collection, volume id (moved up to use in EC validation) + vid, collection, err := volumeIdFromFileName(basename) + if err != nil { + glog.Warningf("get volume id failed, %s, err : %s", volumeName, err) + return false + } + + // skip if ec volumes exists, but validate EC files first if skipIfEcVolumesExists { - if util.FileExists(l.IdxDirectory + "/" + volumeName + ".ecx") { - return false + ecxFilePath := filepath.Join(l.IdxDirectory, volumeName+".ecx") + if util.FileExists(ecxFilePath) { + // Check if EC volume is valid by verifying shard count + if !l.validateEcVolume(collection, vid) { + glog.Warningf("EC volume %d validation failed, removing incomplete EC files to allow .dat file loading", vid) + l.removeEcVolumeFiles(collection, vid) + // Continue to load .dat file + } else { + // Valid EC volume exists, skip .dat file + return false + } } } @@ -161,13 +177,6 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne return false } - // parse out collection, volume id - vid, collection, err := volumeIdFromFileName(basename) - if err != nil { - glog.Warningf("get volume id failed, %s, err : %s", volumeName, err) - return false - } - // avoid loading one volume more than once l.volumesLock.RLock() _, found := l.volumes[vid] diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index 0db73adc6..128bfd26f 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -10,6 +10,7 @@ import ( "slices" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/needle" ) @@ -40,6 +41,23 @@ func (l *DiskLocation) DestroyEcVolume(vid needle.VolumeId) { } } +// unloadEcVolume removes an EC volume from memory without deleting its files on disk. +// This is useful for distributed EC volumes where shards may be on other servers. +func (l *DiskLocation) unloadEcVolume(vid needle.VolumeId) { + var toClose *erasure_coding.EcVolume + l.ecVolumesLock.Lock() + if ecVolume, found := l.ecVolumes[vid]; found { + toClose = ecVolume + delete(l.ecVolumes, vid) + } + l.ecVolumesLock.Unlock() + + // Close outside the lock to avoid holding write lock during I/O + if toClose != nil { + toClose.Close() + } +} + func (l *DiskLocation) CollectEcShards(vid needle.VolumeId, shardFileNames []string) (ecVolume *erasure_coding.EcVolume, found bool) { l.ecVolumesLock.RLock() defer l.ecVolumesLock.RUnlock() @@ -154,8 +172,18 @@ func (l *DiskLocation) loadAllEcShards() (err error) { slices.SortFunc(dirEntries, func(a, b os.DirEntry) int { return strings.Compare(a.Name(), b.Name()) }) + var sameVolumeShards []string var prevVolumeId needle.VolumeId + var prevCollection string + + // Helper to reset state between volume processing + reset := func() { + sameVolumeShards = nil + prevVolumeId = 0 + prevCollection = "" + } + for _, fileInfo := range dirEntries { if fileInfo.IsDir() { continue @@ -178,24 +206,31 @@ func (l *DiskLocation) loadAllEcShards() (err error) { // 0 byte files should be only appearing erroneously for ec data files // so we ignore them if re.MatchString(ext) && info.Size() > 0 { - if prevVolumeId == 0 || volumeId == prevVolumeId { + // Group shards by both collection and volumeId to avoid mixing collections + if prevVolumeId == 0 || (volumeId == prevVolumeId && collection == prevCollection) { sameVolumeShards = append(sameVolumeShards, fileInfo.Name()) } else { + // Before starting a new group, check if previous group had orphaned shards + l.checkOrphanedShards(sameVolumeShards, prevCollection, prevVolumeId) sameVolumeShards = []string{fileInfo.Name()} } prevVolumeId = volumeId + prevCollection = collection continue } - if ext == ".ecx" && volumeId == prevVolumeId { - if err = l.loadEcShards(sameVolumeShards, collection, volumeId); err != nil { - return fmt.Errorf("loadEcShards collection:%v volumeId:%d : %v", collection, volumeId, err) - } - prevVolumeId = volumeId + if ext == ".ecx" && volumeId == prevVolumeId && collection == prevCollection { + l.handleFoundEcxFile(sameVolumeShards, collection, volumeId) + reset() continue } } + + // Check for orphaned EC shards without .ecx file at the end of the directory scan + // This handles the last group of shards in the directory + l.checkOrphanedShards(sameVolumeShards, prevCollection, prevVolumeId) + return nil } @@ -237,3 +272,208 @@ func (l *DiskLocation) EcShardCount() int { } return shardCount } + +// handleFoundEcxFile processes a complete group of EC shards when their .ecx file is found. +// This includes validation, loading, and cleanup of incomplete/invalid EC volumes. +func (l *DiskLocation) handleFoundEcxFile(shards []string, collection string, volumeId needle.VolumeId) { + // Check if this is an incomplete EC encoding (not a distributed EC volume) + // Key distinction: if .dat file still exists, EC encoding may have failed + // If .dat file is gone, this is likely a distributed EC volume with shards on multiple servers + baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(volumeId)) + datFileName := baseFileName + ".dat" + + // Determine .dat presence robustly; unexpected errors are treated as "exists" + datExists := l.checkDatFileExists(datFileName) + + // Validate EC volume if .dat file exists (incomplete EC encoding scenario) + // This checks shard count, shard size consistency, and expected size vs .dat file + // If .dat is gone, EC encoding completed and shards are distributed across servers + if datExists && !l.validateEcVolume(collection, volumeId) { + glog.Warningf("Incomplete or invalid EC volume %d: .dat exists but validation failed, cleaning up EC files...", volumeId) + l.removeEcVolumeFiles(collection, volumeId) + return + } + + // Attempt to load the EC shards + if err := l.loadEcShards(shards, collection, volumeId); err != nil { + // If EC shards failed to load and .dat still exists, clean up EC files to allow .dat file to be used + // If .dat is gone, log error but don't clean up (may be waiting for shards from other servers) + if datExists { + glog.Warningf("Failed to load EC shards for volume %d and .dat exists: %v, cleaning up EC files to use .dat...", volumeId, err) + // Unload first to release FDs, then remove files + l.unloadEcVolume(volumeId) + l.removeEcVolumeFiles(collection, volumeId) + } else { + glog.Warningf("Failed to load EC shards for volume %d: %v (this may be normal for distributed EC volumes)", volumeId, err) + // Clean up any partially loaded in-memory state. This does not delete files. + l.unloadEcVolume(volumeId) + } + return + } +} + +// checkDatFileExists checks if .dat file exists with robust error handling. +// Unexpected errors (permission, I/O) are treated as "exists" to avoid misclassifying +// local EC as distributed EC, which is the safer fallback. +func (l *DiskLocation) checkDatFileExists(datFileName string) bool { + if _, err := os.Stat(datFileName); err == nil { + return true + } else if !os.IsNotExist(err) { + glog.Warningf("Failed to stat .dat file %s: %v", datFileName, err) + // Safer to assume local .dat exists to avoid misclassifying as distributed EC + return true + } + return false +} + +// checkOrphanedShards checks if the given shards are orphaned (no .ecx file) and cleans them up if needed. +// Returns true if orphaned shards were found and cleaned up. +// This handles the case where EC encoding was interrupted before creating the .ecx file. +func (l *DiskLocation) checkOrphanedShards(shards []string, collection string, volumeId needle.VolumeId) bool { + if len(shards) == 0 || volumeId == 0 { + return false + } + + // Check if .dat file exists (incomplete encoding, not distributed EC) + baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(volumeId)) + datFileName := baseFileName + ".dat" + + if l.checkDatFileExists(datFileName) { + glog.Warningf("Found %d EC shards without .ecx file for volume %d (incomplete encoding interrupted before .ecx creation), cleaning up...", + len(shards), volumeId) + l.removeEcVolumeFiles(collection, volumeId) + return true + } + return false +} + +// calculateExpectedShardSize computes the exact expected shard size based on .dat file size +// The EC encoding process is deterministic: +// 1. Data is processed in batches of (LargeBlockSize * DataShardsCount) for large blocks +// 2. Remaining data is processed in batches of (SmallBlockSize * DataShardsCount) for small blocks +// 3. Each shard gets exactly its portion, with zero-padding applied to incomplete blocks +func calculateExpectedShardSize(datFileSize int64) int64 { + var shardSize int64 + + // Process large blocks (1GB * 10 = 10GB batches) + largeBatchSize := int64(erasure_coding.ErasureCodingLargeBlockSize) * int64(erasure_coding.DataShardsCount) + numLargeBatches := datFileSize / largeBatchSize + shardSize = numLargeBatches * int64(erasure_coding.ErasureCodingLargeBlockSize) + remainingSize := datFileSize - (numLargeBatches * largeBatchSize) + + // Process remaining data in small blocks (1MB * 10 = 10MB batches) + if remainingSize > 0 { + smallBatchSize := int64(erasure_coding.ErasureCodingSmallBlockSize) * int64(erasure_coding.DataShardsCount) + numSmallBatches := (remainingSize + smallBatchSize - 1) / smallBatchSize // Ceiling division + shardSize += numSmallBatches * int64(erasure_coding.ErasureCodingSmallBlockSize) + } + + return shardSize +} + +// validateEcVolume checks if EC volume has enough shards to be functional +// For distributed EC volumes (where .dat is deleted), any number of shards is valid +// For incomplete EC encoding (where .dat still exists), we need at least DataShardsCount shards +// Also validates that all shards have the same size (required for Reed-Solomon EC) +// If .dat exists, it also validates shards match the expected size based on .dat file size +func (l *DiskLocation) validateEcVolume(collection string, vid needle.VolumeId) bool { + baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(vid)) + datFileName := baseFileName + ".dat" + + var expectedShardSize int64 = -1 + datExists := false + + // If .dat file exists, compute exact expected shard size from it + if datFileInfo, err := os.Stat(datFileName); err == nil { + datExists = true + expectedShardSize = calculateExpectedShardSize(datFileInfo.Size()) + } else if !os.IsNotExist(err) { + // If stat fails with unexpected error (permission, I/O), fail validation + // Don't treat this as "distributed EC" - it could be a temporary error + glog.Warningf("Failed to stat .dat file %s: %v", datFileName, err) + return false + } + + shardCount := 0 + var actualShardSize int64 = -1 + + // Count shards and validate they all have the same size (required for Reed-Solomon EC) + // Shard files (.ec00 - .ec13) are always in l.Directory, not l.IdxDirectory + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + shardFileName := baseFileName + erasure_coding.ToExt(i) + fi, err := os.Stat(shardFileName) + + if err == nil { + // Check if file has non-zero size + if fi.Size() > 0 { + // Validate all shards are the same size (required for Reed-Solomon EC) + if actualShardSize == -1 { + actualShardSize = fi.Size() + } else if fi.Size() != actualShardSize { + glog.Warningf("EC volume %d shard %d has size %d, expected %d (all EC shards must be same size)", + vid, i, fi.Size(), actualShardSize) + return false + } + shardCount++ + } + } else if !os.IsNotExist(err) { + // If stat fails with unexpected error (permission, I/O), fail validation + // This is consistent with .dat file error handling + glog.Warningf("Failed to stat shard file %s: %v", shardFileName, err) + return false + } + } + + // If .dat file exists, validate shard size matches expected size + if datExists && actualShardSize > 0 && expectedShardSize > 0 { + if actualShardSize != expectedShardSize { + glog.Warningf("EC volume %d: shard size %d doesn't match expected size %d (based on .dat file size)", + vid, actualShardSize, expectedShardSize) + return false + } + } + + // If .dat file is gone, this is a distributed EC volume - any shard count is valid + if !datExists { + glog.V(1).Infof("EC volume %d: distributed EC (.dat removed) with %d shards", vid, shardCount) + return true + } + + // If .dat file exists, we need at least DataShardsCount shards locally + // Otherwise it's an incomplete EC encoding that should be cleaned up + if shardCount < erasure_coding.DataShardsCount { + glog.Warningf("EC volume %d has .dat file but only %d shards (need at least %d for local EC)", + vid, shardCount, erasure_coding.DataShardsCount) + return false + } + + return true +} + +// removeEcVolumeFiles removes all EC-related files for a volume +func (l *DiskLocation) removeEcVolumeFiles(collection string, vid needle.VolumeId) { + baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(vid)) + indexBaseFileName := erasure_coding.EcShardFileName(collection, l.IdxDirectory, int(vid)) + + // Helper to remove a file with consistent error handling + removeFile := func(filePath, description string) { + if err := os.Remove(filePath); err != nil { + if !os.IsNotExist(err) { + glog.Warningf("Failed to remove incomplete %s %s: %v", description, filePath, err) + } + } else { + glog.V(2).Infof("Removed incomplete %s: %s", description, filePath) + } + } + + // Remove index files first (.ecx, .ecj) before shard files + // This ensures that if cleanup is interrupted, the .ecx file won't trigger + // EC loading for incomplete/missing shards on next startup + removeFile(indexBaseFileName+".ecx", "EC index file") + removeFile(indexBaseFileName+".ecj", "EC journal file") + + // Remove all EC shard files (.ec00 ~ .ec13) from data directory + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + removeFile(baseFileName+erasure_coding.ToExt(i), "EC shard file") + } +} diff --git a/weed/storage/disk_location_ec_realworld_test.go b/weed/storage/disk_location_ec_realworld_test.go new file mode 100644 index 000000000..3a21ccb6c --- /dev/null +++ b/weed/storage/disk_location_ec_realworld_test.go @@ -0,0 +1,198 @@ +package storage + +import ( + "os" + "path/filepath" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" +) + +// TestCalculateExpectedShardSizeWithRealEncoding validates our shard size calculation +// by actually running EC encoding on real files and comparing the results +func TestCalculateExpectedShardSizeWithRealEncoding(t *testing.T) { + tempDir := t.TempDir() + + tests := []struct { + name string + datFileSize int64 + description string + }{ + { + name: "5MB file", + datFileSize: 5 * 1024 * 1024, + description: "Small file that needs 1 small block per shard", + }, + { + name: "10MB file (exactly 10 small blocks)", + datFileSize: 10 * 1024 * 1024, + description: "Exactly fits in 1MB small blocks", + }, + { + name: "15MB file", + datFileSize: 15 * 1024 * 1024, + description: "Requires 2 small blocks per shard", + }, + { + name: "50MB file", + datFileSize: 50 * 1024 * 1024, + description: "Requires 5 small blocks per shard", + }, + { + name: "100MB file", + datFileSize: 100 * 1024 * 1024, + description: "Requires 10 small blocks per shard", + }, + { + name: "512MB file", + datFileSize: 512 * 1024 * 1024, + description: "Requires 52 small blocks per shard (rounded up)", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a test .dat file with the specified size + baseFileName := filepath.Join(tempDir, "test_volume") + datFileName := baseFileName + ".dat" + + // Create .dat file with random data pattern (so it's compressible but realistic) + datFile, err := os.Create(datFileName) + if err != nil { + t.Fatalf("Failed to create .dat file: %v", err) + } + + // Write some pattern data (not all zeros, to be more realistic) + pattern := make([]byte, 4096) + for i := range pattern { + pattern[i] = byte(i % 256) + } + + written := int64(0) + for written < tt.datFileSize { + toWrite := tt.datFileSize - written + if toWrite > int64(len(pattern)) { + toWrite = int64(len(pattern)) + } + n, err := datFile.Write(pattern[:toWrite]) + if err != nil { + t.Fatalf("Failed to write to .dat file: %v", err) + } + written += int64(n) + } + datFile.Close() + + // Calculate expected shard size using our function + expectedShardSize := calculateExpectedShardSize(tt.datFileSize) + + // Run actual EC encoding + err = erasure_coding.WriteEcFiles(baseFileName) + if err != nil { + t.Fatalf("Failed to encode EC files: %v", err) + } + + // Measure actual shard sizes + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + shardFileName := baseFileName + erasure_coding.ToExt(i) + shardInfo, err := os.Stat(shardFileName) + if err != nil { + t.Fatalf("Failed to stat shard file %s: %v", shardFileName, err) + } + + actualShardSize := shardInfo.Size() + + // Verify actual size matches expected size + if actualShardSize != expectedShardSize { + t.Errorf("Shard %d size mismatch:\n"+ + " .dat file size: %d bytes\n"+ + " Expected shard size: %d bytes\n"+ + " Actual shard size: %d bytes\n"+ + " Difference: %d bytes\n"+ + " %s", + i, tt.datFileSize, expectedShardSize, actualShardSize, + actualShardSize-expectedShardSize, tt.description) + } + } + + // If we got here, all shards match! + t.Logf("✓ SUCCESS: .dat size %d → actual shard size %d matches calculated size (%s)", + tt.datFileSize, expectedShardSize, tt.description) + + // Cleanup + os.Remove(datFileName) + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + os.Remove(baseFileName + erasure_coding.ToExt(i)) + } + }) + } +} + +// TestCalculateExpectedShardSizeEdgeCases tests edge cases with real encoding +func TestCalculateExpectedShardSizeEdgeCases(t *testing.T) { + tempDir := t.TempDir() + + tests := []struct { + name string + datFileSize int64 + }{ + {"1 byte file", 1}, + {"1KB file", 1024}, + {"10KB file", 10 * 1024}, + {"1MB file (1 small block)", 1024 * 1024}, + {"1MB + 1 byte", 1024*1024 + 1}, + {"9.9MB (almost 1 small block per shard)", 9*1024*1024 + 900*1024}, + {"10.1MB (just over 1 small block per shard)", 10*1024*1024 + 100*1024}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + baseFileName := filepath.Join(tempDir, tt.name) + datFileName := baseFileName + ".dat" + + // Create .dat file + datFile, err := os.Create(datFileName) + if err != nil { + t.Fatalf("Failed to create .dat file: %v", err) + } + + // Write exactly the specified number of bytes + data := make([]byte, tt.datFileSize) + for i := range data { + data[i] = byte(i % 256) + } + datFile.Write(data) + datFile.Close() + + // Calculate expected + expectedShardSize := calculateExpectedShardSize(tt.datFileSize) + + // Run actual EC encoding + err = erasure_coding.WriteEcFiles(baseFileName) + if err != nil { + t.Fatalf("Failed to encode EC files: %v", err) + } + + // Check first shard (all should be same size) + shardFileName := baseFileName + erasure_coding.ToExt(0) + shardInfo, err := os.Stat(shardFileName) + if err != nil { + t.Fatalf("Failed to stat shard file: %v", err) + } + + actualShardSize := shardInfo.Size() + + if actualShardSize != expectedShardSize { + t.Errorf("File size %d: expected shard %d, got %d (diff: %d)", + tt.datFileSize, expectedShardSize, actualShardSize, actualShardSize-expectedShardSize) + } else { + t.Logf("✓ File size %d → shard size %d (correct)", tt.datFileSize, actualShardSize) + } + + // Cleanup + os.Remove(datFileName) + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + os.Remove(baseFileName + erasure_coding.ToExt(i)) + } + }) + } +} diff --git a/weed/storage/disk_location_ec_shard_size_test.go b/weed/storage/disk_location_ec_shard_size_test.go new file mode 100644 index 000000000..e58c1c129 --- /dev/null +++ b/weed/storage/disk_location_ec_shard_size_test.go @@ -0,0 +1,195 @@ +package storage + +import ( + "testing" +) + +func TestCalculateExpectedShardSize(t *testing.T) { + const ( + largeBlock = 1024 * 1024 * 1024 // 1GB + smallBlock = 1024 * 1024 // 1MB + dataShards = 10 + largeBatchSize = largeBlock * dataShards // 10GB + smallBatchSize = smallBlock * dataShards // 10MB + ) + + tests := []struct { + name string + datFileSize int64 + expectedShardSize int64 + description string + }{ + // Edge case: empty file + { + name: "0 bytes (empty file)", + datFileSize: 0, + expectedShardSize: 0, + description: "Empty file has 0 shard size", + }, + + // Boundary tests: exact multiples of large block + { + name: "Exact 10GB (1 large batch)", + datFileSize: largeBatchSize, // 10GB = 1 large batch + expectedShardSize: largeBlock, // 1GB per shard + description: "Exactly fits in large blocks", + }, + { + name: "Exact 20GB (2 large batches)", + datFileSize: 2 * largeBatchSize, // 20GB + expectedShardSize: 2 * largeBlock, // 2GB per shard + description: "2 complete large batches", + }, + { + name: "Just under large batch (10GB - 1 byte)", + datFileSize: largeBatchSize - 1, // 10,737,418,239 bytes + expectedShardSize: 1024 * smallBlock, // 1024MB = 1GB (needs 1024 small blocks) + description: "Just under 10GB needs 1024 small blocks", + }, + { + name: "Just over large batch (10GB + 1 byte)", + datFileSize: largeBatchSize + 1, // 10GB + 1 byte + expectedShardSize: largeBlock + smallBlock, // 1GB + 1MB + description: "Just over 10GB adds 1 small block", + }, + + // Boundary tests: exact multiples of small batch + { + name: "Exact 10MB (1 small batch)", + datFileSize: smallBatchSize, // 10MB + expectedShardSize: smallBlock, // 1MB per shard + description: "Exactly fits in 1 small batch", + }, + { + name: "Exact 20MB (2 small batches)", + datFileSize: 2 * smallBatchSize, // 20MB + expectedShardSize: 2 * smallBlock, // 2MB per shard + description: "2 complete small batches", + }, + { + name: "Just under small batch (10MB - 1 byte)", + datFileSize: smallBatchSize - 1, // 10MB - 1 byte + expectedShardSize: smallBlock, // Still needs 1MB per shard (rounds up) + description: "Just under 10MB rounds up to 1 small block", + }, + { + name: "Just over small batch (10MB + 1 byte)", + datFileSize: smallBatchSize + 1, // 10MB + 1 byte + expectedShardSize: 2 * smallBlock, // 2MB per shard + description: "Just over 10MB needs 2 small blocks", + }, + + // Mixed: large batch + partial small batch + { + name: "10GB + 1MB", + datFileSize: largeBatchSize + 1*1024*1024, // 10GB + 1MB + expectedShardSize: largeBlock + smallBlock, // 1GB + 1MB + description: "1 large batch + 1MB needs 1 small block", + }, + { + name: "10GB + 5MB", + datFileSize: largeBatchSize + 5*1024*1024, // 10GB + 5MB + expectedShardSize: largeBlock + smallBlock, // 1GB + 1MB + description: "1 large batch + 5MB rounds up to 1 small block", + }, + { + name: "10GB + 15MB", + datFileSize: largeBatchSize + 15*1024*1024, // 10GB + 15MB + expectedShardSize: largeBlock + 2*smallBlock, // 1GB + 2MB + description: "1 large batch + 15MB needs 2 small blocks", + }, + + // Original test cases + { + name: "11GB (1 large batch + 103 small blocks)", + datFileSize: 11 * 1024 * 1024 * 1024, // 11GB + expectedShardSize: 1*1024*1024*1024 + 103*1024*1024, // 1GB + 103MB (103 small blocks for 1GB remaining) + description: "1GB large + 1GB remaining needs 103 small blocks", + }, + { + name: "5MB (requires 1 small block per shard)", + datFileSize: 5 * 1024 * 1024, // 5MB + expectedShardSize: 1 * 1024 * 1024, // 1MB per shard (rounded up) + description: "Small file rounds up to 1MB per shard", + }, + { + name: "1KB (minimum size)", + datFileSize: 1024, + expectedShardSize: 1 * 1024 * 1024, // 1MB per shard (1 small block) + description: "Tiny file needs 1 small block", + }, + { + name: "10.5GB (mixed)", + datFileSize: 10*1024*1024*1024 + 512*1024*1024, // 10.5GB + expectedShardSize: 1*1024*1024*1024 + 52*1024*1024, // 1GB + 52MB (52 small blocks for 512MB remaining) + description: "1GB large + 512MB remaining needs 52 small blocks", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actualShardSize := calculateExpectedShardSize(tt.datFileSize) + + if actualShardSize != tt.expectedShardSize { + t.Errorf("Expected shard size %d, got %d. %s", + tt.expectedShardSize, actualShardSize, tt.description) + } + + t.Logf("✓ File size: %d → Shard size: %d (%s)", + tt.datFileSize, actualShardSize, tt.description) + }) + } +} + +// TestShardSizeValidationScenarios tests realistic scenarios +func TestShardSizeValidationScenarios(t *testing.T) { + scenarios := []struct { + name string + datFileSize int64 + actualShardSize int64 + shouldBeValid bool + }{ + { + name: "Valid: exact match for 10GB", + datFileSize: 10 * 1024 * 1024 * 1024, // 10GB + actualShardSize: 1 * 1024 * 1024 * 1024, // 1GB (exact) + shouldBeValid: true, + }, + { + name: "Invalid: 1 byte too small", + datFileSize: 10 * 1024 * 1024 * 1024, // 10GB + actualShardSize: 1*1024*1024*1024 - 1, // 1GB - 1 byte + shouldBeValid: false, + }, + { + name: "Invalid: 1 byte too large", + datFileSize: 10 * 1024 * 1024 * 1024, // 10GB + actualShardSize: 1*1024*1024*1024 + 1, // 1GB + 1 byte + shouldBeValid: false, + }, + { + name: "Valid: small file exact match", + datFileSize: 5 * 1024 * 1024, // 5MB + actualShardSize: 1 * 1024 * 1024, // 1MB (exact) + shouldBeValid: true, + }, + { + name: "Invalid: wrong size for small file", + datFileSize: 5 * 1024 * 1024, // 5MB + actualShardSize: 500 * 1024, // 500KB (too small) + shouldBeValid: false, + }, + } + + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + expectedSize := calculateExpectedShardSize(scenario.datFileSize) + isValid := scenario.actualShardSize == expectedSize + + if isValid != scenario.shouldBeValid { + t.Errorf("Expected validation result %v, got %v. Actual shard: %d, Expected: %d", + scenario.shouldBeValid, isValid, scenario.actualShardSize, expectedSize) + } + }) + } +} diff --git a/weed/storage/disk_location_ec_test.go b/weed/storage/disk_location_ec_test.go new file mode 100644 index 000000000..097536118 --- /dev/null +++ b/weed/storage/disk_location_ec_test.go @@ -0,0 +1,643 @@ +package storage + +import ( + "os" + "path/filepath" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// TestIncompleteEcEncodingCleanup tests the cleanup logic for incomplete EC encoding scenarios +func TestIncompleteEcEncodingCleanup(t *testing.T) { + tests := []struct { + name string + volumeId needle.VolumeId + collection string + createDatFile bool + createEcxFile bool + createEcjFile bool + numShards int + expectCleanup bool + expectLoadSuccess bool + }{ + { + name: "Incomplete EC: shards without .ecx, .dat exists - should cleanup", + volumeId: 100, + collection: "", + createDatFile: true, + createEcxFile: false, + createEcjFile: false, + numShards: 14, // All shards but no .ecx + expectCleanup: true, + expectLoadSuccess: false, + }, + { + name: "Distributed EC: shards without .ecx, .dat deleted - should NOT cleanup", + volumeId: 101, + collection: "", + createDatFile: false, + createEcxFile: false, + createEcjFile: false, + numShards: 5, // Partial shards, distributed + expectCleanup: false, + expectLoadSuccess: false, + }, + { + name: "Incomplete EC: shards with .ecx but < 10 shards, .dat exists - should cleanup", + volumeId: 102, + collection: "", + createDatFile: true, + createEcxFile: true, + createEcjFile: false, + numShards: 7, // Less than DataShardsCount (10) + expectCleanup: true, + expectLoadSuccess: false, + }, + { + name: "Valid local EC: shards with .ecx, >= 10 shards, .dat exists - should load", + volumeId: 103, + collection: "", + createDatFile: true, + createEcxFile: true, + createEcjFile: false, + numShards: 14, // All shards + expectCleanup: false, + expectLoadSuccess: true, // Would succeed if .ecx was valid + }, + { + name: "Distributed EC: shards with .ecx, .dat deleted - should load", + volumeId: 104, + collection: "", + createDatFile: false, + createEcxFile: true, + createEcjFile: false, + numShards: 10, // Enough shards + expectCleanup: false, + expectLoadSuccess: true, // Would succeed if .ecx was valid + }, + { + name: "Incomplete EC with collection: shards without .ecx, .dat exists - should cleanup", + volumeId: 105, + collection: "test_collection", + createDatFile: true, + createEcxFile: false, + createEcjFile: false, + numShards: 14, + expectCleanup: true, + expectLoadSuccess: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Use per-subtest temp directory for stronger isolation + tempDir := t.TempDir() + + // Create DiskLocation + minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"} + diskLocation := &DiskLocation{ + Directory: tempDir, + DirectoryUuid: "test-uuid", + IdxDirectory: tempDir, + DiskType: types.HddType, + MaxVolumeCount: 100, + OriginalMaxVolumeCount: 100, + MinFreeSpace: minFreeSpace, + } + diskLocation.volumes = make(map[needle.VolumeId]*Volume) + diskLocation.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume) + + // Setup test files + baseFileName := erasure_coding.EcShardFileName(tt.collection, tempDir, int(tt.volumeId)) + + // Use deterministic but small size: 10MB .dat => 1MB per shard + datFileSize := int64(10 * 1024 * 1024) // 10MB + expectedShardSize := calculateExpectedShardSize(datFileSize) + + // Create .dat file if needed + if tt.createDatFile { + datFile, err := os.Create(baseFileName + ".dat") + if err != nil { + t.Fatalf("Failed to create .dat file: %v", err) + } + if err := datFile.Truncate(datFileSize); err != nil { + t.Fatalf("Failed to truncate .dat file: %v", err) + } + if err := datFile.Close(); err != nil { + t.Fatalf("Failed to close .dat file: %v", err) + } + } + + // Create EC shard files + for i := 0; i < tt.numShards; i++ { + shardFile, err := os.Create(baseFileName + erasure_coding.ToExt(i)) + if err != nil { + t.Fatalf("Failed to create shard file: %v", err) + } + if err := shardFile.Truncate(expectedShardSize); err != nil { + t.Fatalf("Failed to truncate shard file: %v", err) + } + if err := shardFile.Close(); err != nil { + t.Fatalf("Failed to close shard file: %v", err) + } + } + + // Create .ecx file if needed + if tt.createEcxFile { + ecxFile, err := os.Create(baseFileName + ".ecx") + if err != nil { + t.Fatalf("Failed to create .ecx file: %v", err) + } + if _, err := ecxFile.WriteString("dummy ecx data"); err != nil { + ecxFile.Close() + t.Fatalf("Failed to write .ecx file: %v", err) + } + if err := ecxFile.Close(); err != nil { + t.Fatalf("Failed to close .ecx file: %v", err) + } + } + + // Create .ecj file if needed + if tt.createEcjFile { + ecjFile, err := os.Create(baseFileName + ".ecj") + if err != nil { + t.Fatalf("Failed to create .ecj file: %v", err) + } + if _, err := ecjFile.WriteString("dummy ecj data"); err != nil { + ecjFile.Close() + t.Fatalf("Failed to write .ecj file: %v", err) + } + if err := ecjFile.Close(); err != nil { + t.Fatalf("Failed to close .ecj file: %v", err) + } + } + + // Run loadAllEcShards + loadErr := diskLocation.loadAllEcShards() + if loadErr != nil { + t.Logf("loadAllEcShards returned error (expected in some cases): %v", loadErr) + } + + // Test idempotency - running again should not cause issues + loadErr2 := diskLocation.loadAllEcShards() + if loadErr2 != nil { + t.Logf("Second loadAllEcShards returned error: %v", loadErr2) + } + + // Verify cleanup expectations + if tt.expectCleanup { + // Check that files were cleaned up + if util.FileExists(baseFileName + ".ecx") { + t.Errorf("Expected .ecx to be cleaned up but it still exists") + } + if util.FileExists(baseFileName + ".ecj") { + t.Errorf("Expected .ecj to be cleaned up but it still exists") + } + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + shardFile := baseFileName + erasure_coding.ToExt(i) + if util.FileExists(shardFile) { + t.Errorf("Expected shard %d to be cleaned up but it still exists", i) + } + } + // .dat file should still exist (not cleaned up) + if tt.createDatFile && !util.FileExists(baseFileName+".dat") { + t.Errorf("Expected .dat file to remain but it was deleted") + } + } else { + // Check that files were NOT cleaned up + for i := 0; i < tt.numShards; i++ { + shardFile := baseFileName + erasure_coding.ToExt(i) + if !util.FileExists(shardFile) { + t.Errorf("Expected shard %d to remain but it was cleaned up", i) + } + } + if tt.createEcxFile && !util.FileExists(baseFileName+".ecx") { + t.Errorf("Expected .ecx to remain but it was cleaned up") + } + } + + // Verify load expectations + if tt.expectLoadSuccess { + if diskLocation.EcShardCount() == 0 { + t.Errorf("Expected EC shards to be loaded for volume %d", tt.volumeId) + } + } + + }) + } +} + +// TestValidateEcVolume tests the validateEcVolume function +func TestValidateEcVolume(t *testing.T) { + tempDir := t.TempDir() + + minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"} + diskLocation := &DiskLocation{ + Directory: tempDir, + DirectoryUuid: "test-uuid", + IdxDirectory: tempDir, + DiskType: types.HddType, + MinFreeSpace: minFreeSpace, + } + + tests := []struct { + name string + volumeId needle.VolumeId + collection string + createDatFile bool + numShards int + expectValid bool + }{ + { + name: "Valid: .dat exists with 10+ shards", + volumeId: 200, + collection: "", + createDatFile: true, + numShards: 10, + expectValid: true, + }, + { + name: "Invalid: .dat exists with < 10 shards", + volumeId: 201, + collection: "", + createDatFile: true, + numShards: 9, + expectValid: false, + }, + { + name: "Valid: .dat deleted (distributed EC) with any shards", + volumeId: 202, + collection: "", + createDatFile: false, + numShards: 5, + expectValid: true, + }, + { + name: "Valid: .dat deleted (distributed EC) with no shards", + volumeId: 203, + collection: "", + createDatFile: false, + numShards: 0, + expectValid: true, + }, + { + name: "Invalid: zero-byte shard files should not count", + volumeId: 204, + collection: "", + createDatFile: true, + numShards: 0, // Will create 10 zero-byte files below + expectValid: false, + }, + { + name: "Invalid: .dat exists with different size shards", + volumeId: 205, + collection: "", + createDatFile: true, + numShards: 10, // Will create shards with varying sizes + expectValid: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + baseFileName := erasure_coding.EcShardFileName(tt.collection, tempDir, int(tt.volumeId)) + + // For proper testing, we need to use realistic sizes that match EC encoding + // EC uses large blocks (1GB) and small blocks (1MB) + // For test purposes, use a small .dat file size that still exercises the logic + // 10MB .dat file = 1MB per shard (one small batch, fast and deterministic) + datFileSize := int64(10 * 1024 * 1024) // 10MB + expectedShardSize := calculateExpectedShardSize(datFileSize) + + // Create .dat file if needed + if tt.createDatFile { + datFile, err := os.Create(baseFileName + ".dat") + if err != nil { + t.Fatalf("Failed to create .dat file: %v", err) + } + // Write minimal data (don't need to fill entire 10GB for tests) + datFile.Truncate(datFileSize) + datFile.Close() + } + + // Create EC shard files with correct size + for i := 0; i < tt.numShards; i++ { + shardFile, err := os.Create(baseFileName + erasure_coding.ToExt(i)) + if err != nil { + t.Fatalf("Failed to create shard file: %v", err) + } + // Use truncate to create file of correct size without allocating all the space + if err := shardFile.Truncate(expectedShardSize); err != nil { + shardFile.Close() + t.Fatalf("Failed to truncate shard file: %v", err) + } + if err := shardFile.Close(); err != nil { + t.Fatalf("Failed to close shard file: %v", err) + } + } + + // For zero-byte test case, create empty files for all data shards + if tt.volumeId == 204 { + for i := 0; i < erasure_coding.DataShardsCount; i++ { + shardFile, err := os.Create(baseFileName + erasure_coding.ToExt(i)) + if err != nil { + t.Fatalf("Failed to create empty shard file: %v", err) + } + // Don't write anything - leave as zero-byte + shardFile.Close() + } + } + + // For mismatched shard size test case, create shards with different sizes + if tt.volumeId == 205 { + for i := 0; i < erasure_coding.DataShardsCount; i++ { + shardFile, err := os.Create(baseFileName + erasure_coding.ToExt(i)) + if err != nil { + t.Fatalf("Failed to create shard file: %v", err) + } + // Write different amount of data to each shard + data := make([]byte, 100+i*10) + shardFile.Write(data) + shardFile.Close() + } + } + + // Test validation + isValid := diskLocation.validateEcVolume(tt.collection, tt.volumeId) + if isValid != tt.expectValid { + t.Errorf("Expected validation result %v but got %v", tt.expectValid, isValid) + } + }) + } +} + +// TestRemoveEcVolumeFiles tests the removeEcVolumeFiles function +func TestRemoveEcVolumeFiles(t *testing.T) { + tests := []struct { + name string + separateIdxDir bool + }{ + {"Same directory for data and index", false}, + {"Separate idx directory", true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tempDir := t.TempDir() + + var dataDir, idxDir string + if tt.separateIdxDir { + dataDir = filepath.Join(tempDir, "data") + idxDir = filepath.Join(tempDir, "idx") + os.MkdirAll(dataDir, 0755) + os.MkdirAll(idxDir, 0755) + } else { + dataDir = tempDir + idxDir = tempDir + } + + minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"} + diskLocation := &DiskLocation{ + Directory: dataDir, + DirectoryUuid: "test-uuid", + IdxDirectory: idxDir, + DiskType: types.HddType, + MinFreeSpace: minFreeSpace, + } + + volumeId := needle.VolumeId(300) + collection := "" + dataBaseFileName := erasure_coding.EcShardFileName(collection, dataDir, int(volumeId)) + idxBaseFileName := erasure_coding.EcShardFileName(collection, idxDir, int(volumeId)) + + // Create all EC shard files in data directory + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + shardFile, err := os.Create(dataBaseFileName + erasure_coding.ToExt(i)) + if err != nil { + t.Fatalf("Failed to create shard file: %v", err) + } + if _, err := shardFile.WriteString("dummy shard data"); err != nil { + shardFile.Close() + t.Fatalf("Failed to write shard file: %v", err) + } + if err := shardFile.Close(); err != nil { + t.Fatalf("Failed to close shard file: %v", err) + } + } + + // Create .ecx file in idx directory + ecxFile, err := os.Create(idxBaseFileName + ".ecx") + if err != nil { + t.Fatalf("Failed to create .ecx file: %v", err) + } + if _, err := ecxFile.WriteString("dummy ecx data"); err != nil { + ecxFile.Close() + t.Fatalf("Failed to write .ecx file: %v", err) + } + if err := ecxFile.Close(); err != nil { + t.Fatalf("Failed to close .ecx file: %v", err) + } + + // Create .ecj file in idx directory + ecjFile, err := os.Create(idxBaseFileName + ".ecj") + if err != nil { + t.Fatalf("Failed to create .ecj file: %v", err) + } + if _, err := ecjFile.WriteString("dummy ecj data"); err != nil { + ecjFile.Close() + t.Fatalf("Failed to write .ecj file: %v", err) + } + if err := ecjFile.Close(); err != nil { + t.Fatalf("Failed to close .ecj file: %v", err) + } + + // Create .dat file in data directory (should NOT be removed) + datFile, err := os.Create(dataBaseFileName + ".dat") + if err != nil { + t.Fatalf("Failed to create .dat file: %v", err) + } + if _, err := datFile.WriteString("dummy dat data"); err != nil { + datFile.Close() + t.Fatalf("Failed to write .dat file: %v", err) + } + if err := datFile.Close(); err != nil { + t.Fatalf("Failed to close .dat file: %v", err) + } + + // Call removeEcVolumeFiles + diskLocation.removeEcVolumeFiles(collection, volumeId) + + // Verify all EC shard files are removed from data directory + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + shardFile := dataBaseFileName + erasure_coding.ToExt(i) + if util.FileExists(shardFile) { + t.Errorf("Shard file %d should be removed but still exists", i) + } + } + + // Verify .ecx file is removed from idx directory + if util.FileExists(idxBaseFileName + ".ecx") { + t.Errorf(".ecx file should be removed but still exists") + } + + // Verify .ecj file is removed from idx directory + if util.FileExists(idxBaseFileName + ".ecj") { + t.Errorf(".ecj file should be removed but still exists") + } + + // Verify .dat file is NOT removed from data directory + if !util.FileExists(dataBaseFileName + ".dat") { + t.Errorf(".dat file should NOT be removed but was deleted") + } + }) + } +} + +// TestEcCleanupWithSeparateIdxDirectory tests EC cleanup when idx directory is different +func TestEcCleanupWithSeparateIdxDirectory(t *testing.T) { + tempDir := t.TempDir() + + idxDir := filepath.Join(tempDir, "idx") + dataDir := filepath.Join(tempDir, "data") + os.MkdirAll(idxDir, 0755) + os.MkdirAll(dataDir, 0755) + + minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"} + diskLocation := &DiskLocation{ + Directory: dataDir, + DirectoryUuid: "test-uuid", + IdxDirectory: idxDir, + DiskType: types.HddType, + MinFreeSpace: minFreeSpace, + } + diskLocation.volumes = make(map[needle.VolumeId]*Volume) + diskLocation.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume) + + volumeId := needle.VolumeId(400) + collection := "" + + // Create shards in data directory (shards only go to Directory, not IdxDirectory) + dataBaseFileName := erasure_coding.EcShardFileName(collection, dataDir, int(volumeId)) + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + shardFile, err := os.Create(dataBaseFileName + erasure_coding.ToExt(i)) + if err != nil { + t.Fatalf("Failed to create shard file: %v", err) + } + if _, err := shardFile.WriteString("dummy shard data"); err != nil { + t.Fatalf("Failed to write shard file: %v", err) + } + if err := shardFile.Close(); err != nil { + t.Fatalf("Failed to close shard file: %v", err) + } + } + + // Create .dat in data directory + datFile, err := os.Create(dataBaseFileName + ".dat") + if err != nil { + t.Fatalf("Failed to create .dat file: %v", err) + } + if _, err := datFile.WriteString("dummy data"); err != nil { + t.Fatalf("Failed to write .dat file: %v", err) + } + if err := datFile.Close(); err != nil { + t.Fatalf("Failed to close .dat file: %v", err) + } + + // Do not create .ecx: trigger orphaned-shards cleanup when .dat exists + + // Run loadAllEcShards + loadErr := diskLocation.loadAllEcShards() + if loadErr != nil { + t.Logf("loadAllEcShards error: %v", loadErr) + } + + // Verify cleanup occurred in data directory (shards) + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + shardFile := dataBaseFileName + erasure_coding.ToExt(i) + if util.FileExists(shardFile) { + t.Errorf("Shard file %d should be cleaned up but still exists", i) + } + } + + // Verify .dat in data directory still exists (only EC files are cleaned up) + if !util.FileExists(dataBaseFileName + ".dat") { + t.Errorf(".dat file should remain but was deleted") + } +} + +// TestDistributedEcVolumeNoFileDeletion verifies that distributed EC volumes +// (where .dat is deleted) do NOT have their shard files deleted when load fails +// This tests the critical bug fix where DestroyEcVolume was incorrectly deleting files +func TestDistributedEcVolumeNoFileDeletion(t *testing.T) { + tempDir := t.TempDir() + + minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"} + diskLocation := &DiskLocation{ + Directory: tempDir, + DirectoryUuid: "test-uuid", + IdxDirectory: tempDir, + DiskType: types.HddType, + MinFreeSpace: minFreeSpace, + ecVolumes: make(map[needle.VolumeId]*erasure_coding.EcVolume), + } + + collection := "" + volumeId := needle.VolumeId(500) + baseFileName := erasure_coding.EcShardFileName(collection, tempDir, int(volumeId)) + + // Create EC shards (only 5 shards - less than DataShardsCount, but OK for distributed EC) + numDistributedShards := 5 + for i := 0; i < numDistributedShards; i++ { + shardFile, err := os.Create(baseFileName + erasure_coding.ToExt(i)) + if err != nil { + t.Fatalf("Failed to create shard file: %v", err) + } + if _, err := shardFile.WriteString("dummy shard data"); err != nil { + shardFile.Close() + t.Fatalf("Failed to write shard file: %v", err) + } + if err := shardFile.Close(); err != nil { + t.Fatalf("Failed to close shard file: %v", err) + } + } + + // Create .ecx file to trigger EC loading + ecxFile, err := os.Create(baseFileName + ".ecx") + if err != nil { + t.Fatalf("Failed to create .ecx file: %v", err) + } + if _, err := ecxFile.WriteString("dummy ecx data"); err != nil { + ecxFile.Close() + t.Fatalf("Failed to write .ecx file: %v", err) + } + if err := ecxFile.Close(); err != nil { + t.Fatalf("Failed to close .ecx file: %v", err) + } + + // NO .dat file - this is a distributed EC volume + + // Run loadAllEcShards - this should fail but NOT delete shard files + loadErr := diskLocation.loadAllEcShards() + if loadErr != nil { + t.Logf("loadAllEcShards returned error (expected): %v", loadErr) + } + + // CRITICAL CHECK: Verify shard files still exist (should NOT be deleted) + for i := 0; i < 5; i++ { + shardFile := baseFileName + erasure_coding.ToExt(i) + if !util.FileExists(shardFile) { + t.Errorf("CRITICAL BUG: Shard file %s was deleted for distributed EC volume!", shardFile) + } + } + + // Verify .ecx file still exists (should NOT be deleted for distributed EC) + if !util.FileExists(baseFileName + ".ecx") { + t.Errorf("CRITICAL BUG: .ecx file was deleted for distributed EC volume!") + } + + t.Logf("SUCCESS: Distributed EC volume files preserved (not deleted)") +}