diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index 2859f7683..cc3469c73 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -207,6 +207,12 @@ func (l *DiskLocation) loadAllEcShards() (err error) { // If .dat file is gone, this is likely a distributed EC volume with shards on multiple servers baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(volumeId)) datFileName := baseFileName + ".dat" + // Helper to reset state between volume processing + reset := func() { + sameVolumeShards = nil + prevVolumeId = 0 + } + datExists := util.FileExists(datFileName) // Validate EC volume if .dat file exists (incomplete EC encoding scenario) @@ -217,8 +223,7 @@ func (l *DiskLocation) loadAllEcShards() (err error) { l.removeEcVolumeFiles(collection, volumeId) // Clean up any in-memory state. This does not delete files (already deleted by removeEcVolumeFiles). l.unloadEcVolume(volumeId) - sameVolumeShards = nil - prevVolumeId = 0 + reset() continue } @@ -233,12 +238,10 @@ func (l *DiskLocation) loadAllEcShards() (err error) { } // Clean up any partially loaded in-memory state. This does not delete files. l.unloadEcVolume(volumeId) - sameVolumeShards = nil - prevVolumeId = 0 + reset() continue } - prevVolumeId = 0 - sameVolumeShards = nil + reset() continue } @@ -250,18 +253,8 @@ func (l *DiskLocation) loadAllEcShards() (err error) { // We have collected EC shards but never found .ecx file // Need to determine the collection name from the shard filenames baseName := sameVolumeShards[0][:len(sameVolumeShards[0])-len(path.Ext(sameVolumeShards[0]))] - collection, volumeId, err := parseCollectionVolumeId(baseName) - if err == nil && volumeId == prevVolumeId { - baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(volumeId)) - datFileName := baseFileName + ".dat" - // Only clean up if .dat file exists (incomplete encoding, not distributed EC) - if util.FileExists(datFileName) { - glog.Warningf("Found %d EC shards without .ecx file for volume %d (incomplete encoding interrupted before .ecx creation), cleaning up...", - len(sameVolumeShards), volumeId) - l.removeEcVolumeFiles(collection, volumeId) - // Clean up any in-memory state. This does not delete files (already deleted by removeEcVolumeFiles). - l.unloadEcVolume(volumeId) - } + if collection, volumeId, err := parseCollectionVolumeId(baseName); err == nil && volumeId == prevVolumeId { + l.cleanupIfIncomplete(collection, volumeId, len(sameVolumeShards)) } } @@ -307,6 +300,20 @@ func (l *DiskLocation) EcShardCount() int { return shardCount } +// cleanupIfIncomplete removes EC files when .dat exists but .ecx is missing for a volume. +// This handles the case where EC encoding was interrupted before creating the .ecx file. +func (l *DiskLocation) cleanupIfIncomplete(collection string, vid needle.VolumeId, shardCount int) { + baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(vid)) + datFileName := baseFileName + ".dat" + if util.FileExists(datFileName) { + glog.Warningf("Found %d EC shards without .ecx file for volume %d (incomplete encoding interrupted before .ecx creation), cleaning up...", + shardCount, vid) + l.removeEcVolumeFiles(collection, vid) + // Clean up any in-memory state. This does not delete files (already deleted by removeEcVolumeFiles). + l.unloadEcVolume(vid) + } +} + // calculateExpectedShardSize computes the exact expected shard size based on .dat file size // The EC encoding process is deterministic: // 1. Data is processed in batches of (LargeBlockSize * DataShardsCount) for large blocks @@ -347,6 +354,8 @@ func (l *DiskLocation) validateEcVolume(collection string, vid needle.VolumeId) if datFileInfo, err := os.Stat(datFileName); err == nil { datExists = true expectedShardSize = calculateExpectedShardSize(datFileInfo.Size()) + } else if !os.IsNotExist(err) { + glog.Warningf("Failed to stat .dat file %s: %v", datFileName, err) } shardCount := 0 diff --git a/weed/storage/disk_location_ec_test.go b/weed/storage/disk_location_ec_test.go index 874e73da6..fad1612fa 100644 --- a/weed/storage/disk_location_ec_test.go +++ b/weed/storage/disk_location_ec_test.go @@ -13,8 +13,6 @@ import ( // TestIncompleteEcEncodingCleanup tests the cleanup logic for incomplete EC encoding scenarios func TestIncompleteEcEncodingCleanup(t *testing.T) { - tempDir := t.TempDir() - tests := []struct { name string volumeId needle.VolumeId @@ -96,6 +94,9 @@ func TestIncompleteEcEncodingCleanup(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + // Use per-subtest temp directory for stronger isolation + tempDir := t.TempDir() + // Create DiskLocation minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"} diskLocation := &DiskLocation{ @@ -123,8 +124,12 @@ func TestIncompleteEcEncodingCleanup(t *testing.T) { if err != nil { t.Fatalf("Failed to create .dat file: %v", err) } - datFile.Truncate(datFileSize) - datFile.Close() + if err := datFile.Truncate(datFileSize); err != nil { + t.Fatalf("Failed to truncate .dat file: %v", err) + } + if err := datFile.Close(); err != nil { + t.Fatalf("Failed to close .dat file: %v", err) + } } // Create EC shard files @@ -163,6 +168,12 @@ func TestIncompleteEcEncodingCleanup(t *testing.T) { t.Logf("loadAllEcShards returned error (expected in some cases): %v", loadErr) } + // Test idempotency - running again should not cause issues + loadErr2 := diskLocation.loadAllEcShards() + if loadErr2 != nil { + t.Logf("Second loadAllEcShards returned error: %v", loadErr2) + } + // Verify cleanup expectations if tt.expectCleanup { // Check that files were cleaned up @@ -195,6 +206,13 @@ func TestIncompleteEcEncodingCleanup(t *testing.T) { } } + // Verify load expectations + if tt.expectLoadSuccess { + if diskLocation.EcShardCount() == 0 { + t.Errorf("Expected EC shards to be loaded for volume %d", tt.volumeId) + } + } + }) } } @@ -299,8 +317,12 @@ func TestValidateEcVolume(t *testing.T) { t.Fatalf("Failed to create shard file: %v", err) } // Use truncate to create file of correct size without allocating all the space - shardFile.Truncate(expectedShardSize) - shardFile.Close() + if err := shardFile.Truncate(expectedShardSize); err != nil { + t.Fatalf("Failed to truncate shard file: %v", err) + } + if err := shardFile.Close(); err != nil { + t.Fatalf("Failed to close shard file: %v", err) + } } // For zero-byte test case, create 10 empty files @@ -439,8 +461,7 @@ func TestEcCleanupWithSeparateIdxDirectory(t *testing.T) { datFile.WriteString("dummy data") datFile.Close() - // Create .ecx and .ecj in idx directory (but no .ecx to trigger cleanup) - // Don't create .ecx to test orphaned shards cleanup + // Do not create .ecx: trigger orphaned-shards cleanup when .dat exists // Run loadAllEcShards loadErr := diskLocation.loadAllEcShards()