Browse Source

refactor

pull/7384/head
chrislu 1 month ago
parent
commit
d2822ae7e6
  1. 54
      weed/storage/disk_location_ec.go
  2. 53
      weed/storage/disk_location_ec_test.go

54
weed/storage/disk_location_ec.go

@ -202,6 +202,8 @@ func (l *DiskLocation) loadAllEcShards() (err error) {
if datExists && len(sameVolumeShards) < erasure_coding.DataShardsCount {
glog.Warningf("Incomplete EC encoding for volume %d: .dat exists but only %d shards found (need at least %d), cleaning up EC files...",
volumeId, len(sameVolumeShards), erasure_coding.DataShardsCount)
// Clean up any in-memory state before removing files
l.DestroyEcVolume(volumeId)
l.removeEcVolumeFiles(collection, volumeId)
sameVolumeShards = nil
prevVolumeId = 0
@ -302,9 +304,16 @@ func (l *DiskLocation) validateEcVolume(collection string, vid needle.VolumeId)
baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(vid))
datFileName := baseFileName + ".dat"
datExists := util.FileExists(datFileName)
shardCount := 0
// Count existing EC shard files
// If .dat is gone, it's a distributed EC volume - any shard count is fine
// Short-circuit to avoid unnecessary stat calls
if !datExists {
glog.V(1).Infof("EC volume %d: distributed EC (.dat removed)", vid)
return true
}
// .dat file exists, so we need to validate shard count for local EC
shardCount := 0
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
shardFileName := baseFileName + erasure_coding.ToExt(i)
if fi, err := os.Stat(shardFileName); err == nil {
@ -317,12 +326,6 @@ func (l *DiskLocation) validateEcVolume(collection string, vid needle.VolumeId)
}
}
// If .dat file is gone, this is a distributed EC volume - any shard count is valid
if !datExists {
glog.V(1).Infof("EC volume %d has %d shards (distributed EC, .dat removed)", vid, shardCount)
return true
}
// If .dat file exists, we need at least DataShardsCount shards locally
// Otherwise it's an incomplete EC encoding that should be cleaned up
if shardCount < erasure_coding.DataShardsCount {
@ -339,31 +342,26 @@ func (l *DiskLocation) removeEcVolumeFiles(collection string, vid needle.VolumeI
baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(vid))
indexBaseFileName := erasure_coding.EcShardFileName(collection, l.IdxDirectory, int(vid))
// Remove all EC shard files (.ec00 ~ .ec13)
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
shardFileName := baseFileName + erasure_coding.ToExt(i)
if err := os.Remove(shardFileName); err != nil {
// Helper to remove a file with consistent error handling
removeFile := func(path, description string) {
if err := os.Remove(path); err != nil {
if !os.IsNotExist(err) {
glog.Warningf("Failed to remove incomplete EC shard file %s: %v", shardFileName, err)
glog.Warningf("Failed to remove incomplete %s %s: %v", description, path, err)
}
} else {
glog.V(2).Infof("Removed incomplete EC shard file: %s", shardFileName)
glog.V(2).Infof("Removed incomplete %s: %s", description, path)
}
}
// Remove index files
if err := os.Remove(indexBaseFileName + ".ecx"); err != nil {
if !os.IsNotExist(err) {
glog.Warningf("Failed to remove incomplete EC index file %s.ecx: %v", indexBaseFileName, err)
}
} else {
glog.V(2).Infof("Removed incomplete EC index file: %s.ecx", indexBaseFileName)
}
if err := os.Remove(indexBaseFileName + ".ecj"); err != nil {
if !os.IsNotExist(err) {
glog.Warningf("Failed to remove incomplete EC journal file %s.ecj: %v", indexBaseFileName, err)
}
} else {
glog.V(2).Infof("Removed incomplete EC journal file: %s.ecj", indexBaseFileName)
// Remove index files first (.ecx, .ecj) before shard files
// This ensures that if cleanup is interrupted, the .ecx file won't trigger
// EC loading for incomplete/missing shards on next startup
removeFile(indexBaseFileName+".ecx", "EC index file")
removeFile(indexBaseFileName+".ecj", "EC journal file")
// Remove all EC shard files (.ec00 ~ .ec13)
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
shardFileName := baseFileName + erasure_coding.ToExt(i)
removeFile(shardFileName, "EC shard file")
}
}

53
weed/storage/disk_location_ec_test.go

@ -13,12 +13,7 @@ import (
// TestIncompleteEcEncodingCleanup tests the cleanup logic for incomplete EC encoding scenarios
func TestIncompleteEcEncodingCleanup(t *testing.T) {
// Create temporary test directory
tempDir, err := os.MkdirTemp("", "ec_cleanup_test")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir)
tempDir := t.TempDir()
tests := []struct {
name string
@ -159,9 +154,9 @@ func TestIncompleteEcEncodingCleanup(t *testing.T) {
}
// Run loadAllEcShards
err = diskLocation.loadAllEcShards()
if err != nil {
t.Logf("loadAllEcShards returned error (expected in some cases): %v", err)
loadErr := diskLocation.loadAllEcShards()
if loadErr != nil {
t.Logf("loadAllEcShards returned error (expected in some cases): %v", loadErr)
}
// Verify cleanup expectations
@ -196,24 +191,13 @@ func TestIncompleteEcEncodingCleanup(t *testing.T) {
}
}
// Cleanup test files for next iteration
os.Remove(baseFileName + ".dat")
os.Remove(baseFileName + ".ecx")
os.Remove(baseFileName + ".ecj")
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
os.Remove(baseFileName + erasure_coding.ToExt(i))
}
})
}
}
// TestValidateEcVolume tests the validateEcVolume function
func TestValidateEcVolume(t *testing.T) {
tempDir, err := os.MkdirTemp("", "ec_validate_test")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir)
tempDir := t.TempDir()
minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"}
diskLocation := &DiskLocation{
@ -315,23 +299,13 @@ func TestValidateEcVolume(t *testing.T) {
if isValid != tt.expectValid {
t.Errorf("Expected validation result %v but got %v", tt.expectValid, isValid)
}
// Cleanup
os.Remove(baseFileName + ".dat")
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
os.Remove(baseFileName + erasure_coding.ToExt(i))
}
})
}
}
// TestRemoveEcVolumeFiles tests the removeEcVolumeFiles function
func TestRemoveEcVolumeFiles(t *testing.T) {
tempDir, err := os.MkdirTemp("", "ec_remove_test")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir)
tempDir := t.TempDir()
minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"}
diskLocation := &DiskLocation{
@ -392,18 +366,11 @@ func TestRemoveEcVolumeFiles(t *testing.T) {
if !util.FileExists(baseFileName + ".dat") {
t.Errorf(".dat file should NOT be removed but was deleted")
}
// Cleanup
os.Remove(baseFileName + ".dat")
}
// TestEcCleanupWithSeparateIdxDirectory tests EC cleanup when idx directory is different
func TestEcCleanupWithSeparateIdxDirectory(t *testing.T) {
tempDir, err := os.MkdirTemp("", "ec_cleanup_idx_test")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir)
tempDir := t.TempDir()
idxDir := filepath.Join(tempDir, "idx")
dataDir := filepath.Join(tempDir, "data")
@ -441,9 +408,9 @@ func TestEcCleanupWithSeparateIdxDirectory(t *testing.T) {
// Don't create .ecx to test orphaned shards cleanup
// Run loadAllEcShards
err = diskLocation.loadAllEcShards()
if err != nil {
t.Logf("loadAllEcShards error: %v", err)
loadErr := diskLocation.loadAllEcShards()
if loadErr != nil {
t.Logf("loadAllEcShards error: %v", loadErr)
}
// Verify cleanup occurred

Loading…
Cancel
Save