Browse Source

refactoring

pull/7384/head
chrislu 1 month ago
parent
commit
6fff6269c8
  1. 45
      weed/storage/disk_location_ec.go
  2. 37
      weed/storage/disk_location_ec_test.go

45
weed/storage/disk_location_ec.go

@ -207,6 +207,12 @@ func (l *DiskLocation) loadAllEcShards() (err error) {
// If .dat file is gone, this is likely a distributed EC volume with shards on multiple servers
baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(volumeId))
datFileName := baseFileName + ".dat"
// Helper to reset state between volume processing
reset := func() {
sameVolumeShards = nil
prevVolumeId = 0
}
datExists := util.FileExists(datFileName)
// Validate EC volume if .dat file exists (incomplete EC encoding scenario)
@ -217,8 +223,7 @@ func (l *DiskLocation) loadAllEcShards() (err error) {
l.removeEcVolumeFiles(collection, volumeId)
// Clean up any in-memory state. This does not delete files (already deleted by removeEcVolumeFiles).
l.unloadEcVolume(volumeId)
sameVolumeShards = nil
prevVolumeId = 0
reset()
continue
}
@ -233,12 +238,10 @@ func (l *DiskLocation) loadAllEcShards() (err error) {
}
// Clean up any partially loaded in-memory state. This does not delete files.
l.unloadEcVolume(volumeId)
sameVolumeShards = nil
prevVolumeId = 0
reset()
continue
}
prevVolumeId = 0
sameVolumeShards = nil
reset()
continue
}
@ -250,18 +253,8 @@ func (l *DiskLocation) loadAllEcShards() (err error) {
// We have collected EC shards but never found .ecx file
// Need to determine the collection name from the shard filenames
baseName := sameVolumeShards[0][:len(sameVolumeShards[0])-len(path.Ext(sameVolumeShards[0]))]
collection, volumeId, err := parseCollectionVolumeId(baseName)
if err == nil && volumeId == prevVolumeId {
baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(volumeId))
datFileName := baseFileName + ".dat"
// Only clean up if .dat file exists (incomplete encoding, not distributed EC)
if util.FileExists(datFileName) {
glog.Warningf("Found %d EC shards without .ecx file for volume %d (incomplete encoding interrupted before .ecx creation), cleaning up...",
len(sameVolumeShards), volumeId)
l.removeEcVolumeFiles(collection, volumeId)
// Clean up any in-memory state. This does not delete files (already deleted by removeEcVolumeFiles).
l.unloadEcVolume(volumeId)
}
if collection, volumeId, err := parseCollectionVolumeId(baseName); err == nil && volumeId == prevVolumeId {
l.cleanupIfIncomplete(collection, volumeId, len(sameVolumeShards))
}
}
@ -307,6 +300,20 @@ func (l *DiskLocation) EcShardCount() int {
return shardCount
}
// cleanupIfIncomplete removes EC files when .dat exists but .ecx is missing for a volume.
// This handles the case where EC encoding was interrupted before creating the .ecx file.
func (l *DiskLocation) cleanupIfIncomplete(collection string, vid needle.VolumeId, shardCount int) {
baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(vid))
datFileName := baseFileName + ".dat"
if util.FileExists(datFileName) {
glog.Warningf("Found %d EC shards without .ecx file for volume %d (incomplete encoding interrupted before .ecx creation), cleaning up...",
shardCount, vid)
l.removeEcVolumeFiles(collection, vid)
// Clean up any in-memory state. This does not delete files (already deleted by removeEcVolumeFiles).
l.unloadEcVolume(vid)
}
}
// calculateExpectedShardSize computes the exact expected shard size based on .dat file size
// The EC encoding process is deterministic:
// 1. Data is processed in batches of (LargeBlockSize * DataShardsCount) for large blocks
@ -347,6 +354,8 @@ func (l *DiskLocation) validateEcVolume(collection string, vid needle.VolumeId)
if datFileInfo, err := os.Stat(datFileName); err == nil {
datExists = true
expectedShardSize = calculateExpectedShardSize(datFileInfo.Size())
} else if !os.IsNotExist(err) {
glog.Warningf("Failed to stat .dat file %s: %v", datFileName, err)
}
shardCount := 0

37
weed/storage/disk_location_ec_test.go

@ -13,8 +13,6 @@ import (
// TestIncompleteEcEncodingCleanup tests the cleanup logic for incomplete EC encoding scenarios
func TestIncompleteEcEncodingCleanup(t *testing.T) {
tempDir := t.TempDir()
tests := []struct {
name string
volumeId needle.VolumeId
@ -96,6 +94,9 @@ func TestIncompleteEcEncodingCleanup(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Use per-subtest temp directory for stronger isolation
tempDir := t.TempDir()
// Create DiskLocation
minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"}
diskLocation := &DiskLocation{
@ -123,8 +124,12 @@ func TestIncompleteEcEncodingCleanup(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create .dat file: %v", err)
}
datFile.Truncate(datFileSize)
datFile.Close()
if err := datFile.Truncate(datFileSize); err != nil {
t.Fatalf("Failed to truncate .dat file: %v", err)
}
if err := datFile.Close(); err != nil {
t.Fatalf("Failed to close .dat file: %v", err)
}
}
// Create EC shard files
@ -163,6 +168,12 @@ func TestIncompleteEcEncodingCleanup(t *testing.T) {
t.Logf("loadAllEcShards returned error (expected in some cases): %v", loadErr)
}
// Test idempotency - running again should not cause issues
loadErr2 := diskLocation.loadAllEcShards()
if loadErr2 != nil {
t.Logf("Second loadAllEcShards returned error: %v", loadErr2)
}
// Verify cleanup expectations
if tt.expectCleanup {
// Check that files were cleaned up
@ -195,6 +206,13 @@ func TestIncompleteEcEncodingCleanup(t *testing.T) {
}
}
// Verify load expectations
if tt.expectLoadSuccess {
if diskLocation.EcShardCount() == 0 {
t.Errorf("Expected EC shards to be loaded for volume %d", tt.volumeId)
}
}
})
}
}
@ -299,8 +317,12 @@ func TestValidateEcVolume(t *testing.T) {
t.Fatalf("Failed to create shard file: %v", err)
}
// Use truncate to create file of correct size without allocating all the space
shardFile.Truncate(expectedShardSize)
shardFile.Close()
if err := shardFile.Truncate(expectedShardSize); err != nil {
t.Fatalf("Failed to truncate shard file: %v", err)
}
if err := shardFile.Close(); err != nil {
t.Fatalf("Failed to close shard file: %v", err)
}
}
// For zero-byte test case, create 10 empty files
@ -439,8 +461,7 @@ func TestEcCleanupWithSeparateIdxDirectory(t *testing.T) {
datFile.WriteString("dummy data")
datFile.Close()
// Create .ecx and .ecj in idx directory (but no .ecx to trigger cleanup)
// Don't create .ecx to test orphaned shards cleanup
// Do not create .ecx: trigger orphaned-shards cleanup when .dat exists
// Run loadAllEcShards
loadErr := diskLocation.loadAllEcShards()

Loading…
Cancel
Save