Browse Source

Volume Server: handle incomplete ec encoding (#7384)

* handle incomplete ec encoding

* unit tests

* simplify, and better logs

* Update disk_location_ec.go

When loadEcShards() fails partway through, some EC shards may already be loaded into the l.ecVolumes map in memory. The previous code only cleaned up filesystem files but left orphaned in-memory state, which could cause memory leaks and inconsistent state.

* address comments

* Performance: Avoid Double os.Stat() Call
* Platform Compatibility: Use filepath.Join

* in memory cleanup

* Update disk_location_ec.go

* refactor

* Added Shard Size Validation

* check ec shard sizes

* validate shard size

* calculate expected shard size

* refactoring

* minor

* fix shard directory

* 10GB sparse files can be slow or fail on non-sparse FS. Use 10MB to hit SmallBlockSize math (1MB shards) deterministically.

* grouping logic should be updated to use both collection and volumeId to ensure correctness

* unexpected error

* handle exceptions in tests; use constants

* The check for orphaned shards should be performed for the previous volume before resetting sameVolumeShards for the new volume.

* address comments

* Eliminated Redundant Parsing in checkOrphanedShards

* minor

* Avoid misclassifying local EC as distributed when .dat stat errors occur; also standardize unload-before-remove.

* fmt

* refactor

* refactor

* adjust to warning
pull/7387/head
Chris Lu 4 days ago
committed by GitHub
parent
commit
0813138d57
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 29
      weed/storage/disk_location.go
  2. 252
      weed/storage/disk_location_ec.go
  3. 198
      weed/storage/disk_location_ec_realworld_test.go
  4. 195
      weed/storage/disk_location_ec_shard_size_test.go
  5. 643
      weed/storage/disk_location_ec_test.go

29
weed/storage/disk_location.go

@ -144,10 +144,26 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne
return false
}
// skip if ec volumes exists
// parse out collection, volume id (moved up to use in EC validation)
vid, collection, err := volumeIdFromFileName(basename)
if err != nil {
glog.Warningf("get volume id failed, %s, err : %s", volumeName, err)
return false
}
// skip if ec volumes exists, but validate EC files first
if skipIfEcVolumesExists {
if util.FileExists(l.IdxDirectory + "/" + volumeName + ".ecx") {
return false
ecxFilePath := filepath.Join(l.IdxDirectory, volumeName+".ecx")
if util.FileExists(ecxFilePath) {
// Check if EC volume is valid by verifying shard count
if !l.validateEcVolume(collection, vid) {
glog.Warningf("EC volume %d validation failed, removing incomplete EC files to allow .dat file loading", vid)
l.removeEcVolumeFiles(collection, vid)
// Continue to load .dat file
} else {
// Valid EC volume exists, skip .dat file
return false
}
}
}
@ -161,13 +177,6 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne
return false
}
// parse out collection, volume id
vid, collection, err := volumeIdFromFileName(basename)
if err != nil {
glog.Warningf("get volume id failed, %s, err : %s", volumeName, err)
return false
}
// avoid loading one volume more than once
l.volumesLock.RLock()
_, found := l.volumes[vid]

252
weed/storage/disk_location_ec.go

@ -10,6 +10,7 @@ import (
"slices"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
@ -40,6 +41,23 @@ func (l *DiskLocation) DestroyEcVolume(vid needle.VolumeId) {
}
}
// unloadEcVolume removes an EC volume from memory without deleting its files on disk.
// This is useful for distributed EC volumes where shards may be on other servers.
func (l *DiskLocation) unloadEcVolume(vid needle.VolumeId) {
var toClose *erasure_coding.EcVolume
l.ecVolumesLock.Lock()
if ecVolume, found := l.ecVolumes[vid]; found {
toClose = ecVolume
delete(l.ecVolumes, vid)
}
l.ecVolumesLock.Unlock()
// Close outside the lock to avoid holding write lock during I/O
if toClose != nil {
toClose.Close()
}
}
func (l *DiskLocation) CollectEcShards(vid needle.VolumeId, shardFileNames []string) (ecVolume *erasure_coding.EcVolume, found bool) {
l.ecVolumesLock.RLock()
defer l.ecVolumesLock.RUnlock()
@ -154,8 +172,18 @@ func (l *DiskLocation) loadAllEcShards() (err error) {
slices.SortFunc(dirEntries, func(a, b os.DirEntry) int {
return strings.Compare(a.Name(), b.Name())
})
var sameVolumeShards []string
var prevVolumeId needle.VolumeId
var prevCollection string
// Helper to reset state between volume processing
reset := func() {
sameVolumeShards = nil
prevVolumeId = 0
prevCollection = ""
}
for _, fileInfo := range dirEntries {
if fileInfo.IsDir() {
continue
@ -178,24 +206,31 @@ func (l *DiskLocation) loadAllEcShards() (err error) {
// 0 byte files should be only appearing erroneously for ec data files
// so we ignore them
if re.MatchString(ext) && info.Size() > 0 {
if prevVolumeId == 0 || volumeId == prevVolumeId {
// Group shards by both collection and volumeId to avoid mixing collections
if prevVolumeId == 0 || (volumeId == prevVolumeId && collection == prevCollection) {
sameVolumeShards = append(sameVolumeShards, fileInfo.Name())
} else {
// Before starting a new group, check if previous group had orphaned shards
l.checkOrphanedShards(sameVolumeShards, prevCollection, prevVolumeId)
sameVolumeShards = []string{fileInfo.Name()}
}
prevVolumeId = volumeId
prevCollection = collection
continue
}
if ext == ".ecx" && volumeId == prevVolumeId {
if err = l.loadEcShards(sameVolumeShards, collection, volumeId); err != nil {
return fmt.Errorf("loadEcShards collection:%v volumeId:%d : %v", collection, volumeId, err)
}
prevVolumeId = volumeId
if ext == ".ecx" && volumeId == prevVolumeId && collection == prevCollection {
l.handleFoundEcxFile(sameVolumeShards, collection, volumeId)
reset()
continue
}
}
// Check for orphaned EC shards without .ecx file at the end of the directory scan
// This handles the last group of shards in the directory
l.checkOrphanedShards(sameVolumeShards, prevCollection, prevVolumeId)
return nil
}
@ -237,3 +272,208 @@ func (l *DiskLocation) EcShardCount() int {
}
return shardCount
}
// handleFoundEcxFile processes a complete group of EC shards when their .ecx file is found.
// This includes validation, loading, and cleanup of incomplete/invalid EC volumes.
func (l *DiskLocation) handleFoundEcxFile(shards []string, collection string, volumeId needle.VolumeId) {
// Check if this is an incomplete EC encoding (not a distributed EC volume)
// Key distinction: if .dat file still exists, EC encoding may have failed
// If .dat file is gone, this is likely a distributed EC volume with shards on multiple servers
baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(volumeId))
datFileName := baseFileName + ".dat"
// Determine .dat presence robustly; unexpected errors are treated as "exists"
datExists := l.checkDatFileExists(datFileName)
// Validate EC volume if .dat file exists (incomplete EC encoding scenario)
// This checks shard count, shard size consistency, and expected size vs .dat file
// If .dat is gone, EC encoding completed and shards are distributed across servers
if datExists && !l.validateEcVolume(collection, volumeId) {
glog.Warningf("Incomplete or invalid EC volume %d: .dat exists but validation failed, cleaning up EC files...", volumeId)
l.removeEcVolumeFiles(collection, volumeId)
return
}
// Attempt to load the EC shards
if err := l.loadEcShards(shards, collection, volumeId); err != nil {
// If EC shards failed to load and .dat still exists, clean up EC files to allow .dat file to be used
// If .dat is gone, log error but don't clean up (may be waiting for shards from other servers)
if datExists {
glog.Warningf("Failed to load EC shards for volume %d and .dat exists: %v, cleaning up EC files to use .dat...", volumeId, err)
// Unload first to release FDs, then remove files
l.unloadEcVolume(volumeId)
l.removeEcVolumeFiles(collection, volumeId)
} else {
glog.Warningf("Failed to load EC shards for volume %d: %v (this may be normal for distributed EC volumes)", volumeId, err)
// Clean up any partially loaded in-memory state. This does not delete files.
l.unloadEcVolume(volumeId)
}
return
}
}
// checkDatFileExists checks if .dat file exists with robust error handling.
// Unexpected errors (permission, I/O) are treated as "exists" to avoid misclassifying
// local EC as distributed EC, which is the safer fallback.
func (l *DiskLocation) checkDatFileExists(datFileName string) bool {
if _, err := os.Stat(datFileName); err == nil {
return true
} else if !os.IsNotExist(err) {
glog.Warningf("Failed to stat .dat file %s: %v", datFileName, err)
// Safer to assume local .dat exists to avoid misclassifying as distributed EC
return true
}
return false
}
// checkOrphanedShards checks if the given shards are orphaned (no .ecx file) and cleans them up if needed.
// Returns true if orphaned shards were found and cleaned up.
// This handles the case where EC encoding was interrupted before creating the .ecx file.
func (l *DiskLocation) checkOrphanedShards(shards []string, collection string, volumeId needle.VolumeId) bool {
if len(shards) == 0 || volumeId == 0 {
return false
}
// Check if .dat file exists (incomplete encoding, not distributed EC)
baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(volumeId))
datFileName := baseFileName + ".dat"
if l.checkDatFileExists(datFileName) {
glog.Warningf("Found %d EC shards without .ecx file for volume %d (incomplete encoding interrupted before .ecx creation), cleaning up...",
len(shards), volumeId)
l.removeEcVolumeFiles(collection, volumeId)
return true
}
return false
}
// calculateExpectedShardSize computes the exact expected shard size based on .dat file size
// The EC encoding process is deterministic:
// 1. Data is processed in batches of (LargeBlockSize * DataShardsCount) for large blocks
// 2. Remaining data is processed in batches of (SmallBlockSize * DataShardsCount) for small blocks
// 3. Each shard gets exactly its portion, with zero-padding applied to incomplete blocks
func calculateExpectedShardSize(datFileSize int64) int64 {
var shardSize int64
// Process large blocks (1GB * 10 = 10GB batches)
largeBatchSize := int64(erasure_coding.ErasureCodingLargeBlockSize) * int64(erasure_coding.DataShardsCount)
numLargeBatches := datFileSize / largeBatchSize
shardSize = numLargeBatches * int64(erasure_coding.ErasureCodingLargeBlockSize)
remainingSize := datFileSize - (numLargeBatches * largeBatchSize)
// Process remaining data in small blocks (1MB * 10 = 10MB batches)
if remainingSize > 0 {
smallBatchSize := int64(erasure_coding.ErasureCodingSmallBlockSize) * int64(erasure_coding.DataShardsCount)
numSmallBatches := (remainingSize + smallBatchSize - 1) / smallBatchSize // Ceiling division
shardSize += numSmallBatches * int64(erasure_coding.ErasureCodingSmallBlockSize)
}
return shardSize
}
// validateEcVolume checks if EC volume has enough shards to be functional
// For distributed EC volumes (where .dat is deleted), any number of shards is valid
// For incomplete EC encoding (where .dat still exists), we need at least DataShardsCount shards
// Also validates that all shards have the same size (required for Reed-Solomon EC)
// If .dat exists, it also validates shards match the expected size based on .dat file size
func (l *DiskLocation) validateEcVolume(collection string, vid needle.VolumeId) bool {
baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(vid))
datFileName := baseFileName + ".dat"
var expectedShardSize int64 = -1
datExists := false
// If .dat file exists, compute exact expected shard size from it
if datFileInfo, err := os.Stat(datFileName); err == nil {
datExists = true
expectedShardSize = calculateExpectedShardSize(datFileInfo.Size())
} else if !os.IsNotExist(err) {
// If stat fails with unexpected error (permission, I/O), fail validation
// Don't treat this as "distributed EC" - it could be a temporary error
glog.Warningf("Failed to stat .dat file %s: %v", datFileName, err)
return false
}
shardCount := 0
var actualShardSize int64 = -1
// Count shards and validate they all have the same size (required for Reed-Solomon EC)
// Shard files (.ec00 - .ec13) are always in l.Directory, not l.IdxDirectory
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
shardFileName := baseFileName + erasure_coding.ToExt(i)
fi, err := os.Stat(shardFileName)
if err == nil {
// Check if file has non-zero size
if fi.Size() > 0 {
// Validate all shards are the same size (required for Reed-Solomon EC)
if actualShardSize == -1 {
actualShardSize = fi.Size()
} else if fi.Size() != actualShardSize {
glog.Warningf("EC volume %d shard %d has size %d, expected %d (all EC shards must be same size)",
vid, i, fi.Size(), actualShardSize)
return false
}
shardCount++
}
} else if !os.IsNotExist(err) {
// If stat fails with unexpected error (permission, I/O), fail validation
// This is consistent with .dat file error handling
glog.Warningf("Failed to stat shard file %s: %v", shardFileName, err)
return false
}
}
// If .dat file exists, validate shard size matches expected size
if datExists && actualShardSize > 0 && expectedShardSize > 0 {
if actualShardSize != expectedShardSize {
glog.Warningf("EC volume %d: shard size %d doesn't match expected size %d (based on .dat file size)",
vid, actualShardSize, expectedShardSize)
return false
}
}
// If .dat file is gone, this is a distributed EC volume - any shard count is valid
if !datExists {
glog.V(1).Infof("EC volume %d: distributed EC (.dat removed) with %d shards", vid, shardCount)
return true
}
// If .dat file exists, we need at least DataShardsCount shards locally
// Otherwise it's an incomplete EC encoding that should be cleaned up
if shardCount < erasure_coding.DataShardsCount {
glog.Warningf("EC volume %d has .dat file but only %d shards (need at least %d for local EC)",
vid, shardCount, erasure_coding.DataShardsCount)
return false
}
return true
}
// removeEcVolumeFiles removes all EC-related files for a volume
func (l *DiskLocation) removeEcVolumeFiles(collection string, vid needle.VolumeId) {
baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(vid))
indexBaseFileName := erasure_coding.EcShardFileName(collection, l.IdxDirectory, int(vid))
// Helper to remove a file with consistent error handling
removeFile := func(filePath, description string) {
if err := os.Remove(filePath); err != nil {
if !os.IsNotExist(err) {
glog.Warningf("Failed to remove incomplete %s %s: %v", description, filePath, err)
}
} else {
glog.V(2).Infof("Removed incomplete %s: %s", description, filePath)
}
}
// Remove index files first (.ecx, .ecj) before shard files
// This ensures that if cleanup is interrupted, the .ecx file won't trigger
// EC loading for incomplete/missing shards on next startup
removeFile(indexBaseFileName+".ecx", "EC index file")
removeFile(indexBaseFileName+".ecj", "EC journal file")
// Remove all EC shard files (.ec00 ~ .ec13) from data directory
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
removeFile(baseFileName+erasure_coding.ToExt(i), "EC shard file")
}
}

198
weed/storage/disk_location_ec_realworld_test.go

@ -0,0 +1,198 @@
package storage
import (
"os"
"path/filepath"
"testing"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
)
// TestCalculateExpectedShardSizeWithRealEncoding validates our shard size calculation
// by actually running EC encoding on real files and comparing the results
func TestCalculateExpectedShardSizeWithRealEncoding(t *testing.T) {
tempDir := t.TempDir()
tests := []struct {
name string
datFileSize int64
description string
}{
{
name: "5MB file",
datFileSize: 5 * 1024 * 1024,
description: "Small file that needs 1 small block per shard",
},
{
name: "10MB file (exactly 10 small blocks)",
datFileSize: 10 * 1024 * 1024,
description: "Exactly fits in 1MB small blocks",
},
{
name: "15MB file",
datFileSize: 15 * 1024 * 1024,
description: "Requires 2 small blocks per shard",
},
{
name: "50MB file",
datFileSize: 50 * 1024 * 1024,
description: "Requires 5 small blocks per shard",
},
{
name: "100MB file",
datFileSize: 100 * 1024 * 1024,
description: "Requires 10 small blocks per shard",
},
{
name: "512MB file",
datFileSize: 512 * 1024 * 1024,
description: "Requires 52 small blocks per shard (rounded up)",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create a test .dat file with the specified size
baseFileName := filepath.Join(tempDir, "test_volume")
datFileName := baseFileName + ".dat"
// Create .dat file with random data pattern (so it's compressible but realistic)
datFile, err := os.Create(datFileName)
if err != nil {
t.Fatalf("Failed to create .dat file: %v", err)
}
// Write some pattern data (not all zeros, to be more realistic)
pattern := make([]byte, 4096)
for i := range pattern {
pattern[i] = byte(i % 256)
}
written := int64(0)
for written < tt.datFileSize {
toWrite := tt.datFileSize - written
if toWrite > int64(len(pattern)) {
toWrite = int64(len(pattern))
}
n, err := datFile.Write(pattern[:toWrite])
if err != nil {
t.Fatalf("Failed to write to .dat file: %v", err)
}
written += int64(n)
}
datFile.Close()
// Calculate expected shard size using our function
expectedShardSize := calculateExpectedShardSize(tt.datFileSize)
// Run actual EC encoding
err = erasure_coding.WriteEcFiles(baseFileName)
if err != nil {
t.Fatalf("Failed to encode EC files: %v", err)
}
// Measure actual shard sizes
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
shardFileName := baseFileName + erasure_coding.ToExt(i)
shardInfo, err := os.Stat(shardFileName)
if err != nil {
t.Fatalf("Failed to stat shard file %s: %v", shardFileName, err)
}
actualShardSize := shardInfo.Size()
// Verify actual size matches expected size
if actualShardSize != expectedShardSize {
t.Errorf("Shard %d size mismatch:\n"+
" .dat file size: %d bytes\n"+
" Expected shard size: %d bytes\n"+
" Actual shard size: %d bytes\n"+
" Difference: %d bytes\n"+
" %s",
i, tt.datFileSize, expectedShardSize, actualShardSize,
actualShardSize-expectedShardSize, tt.description)
}
}
// If we got here, all shards match!
t.Logf("✓ SUCCESS: .dat size %d → actual shard size %d matches calculated size (%s)",
tt.datFileSize, expectedShardSize, tt.description)
// Cleanup
os.Remove(datFileName)
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
os.Remove(baseFileName + erasure_coding.ToExt(i))
}
})
}
}
// TestCalculateExpectedShardSizeEdgeCases tests edge cases with real encoding
func TestCalculateExpectedShardSizeEdgeCases(t *testing.T) {
tempDir := t.TempDir()
tests := []struct {
name string
datFileSize int64
}{
{"1 byte file", 1},
{"1KB file", 1024},
{"10KB file", 10 * 1024},
{"1MB file (1 small block)", 1024 * 1024},
{"1MB + 1 byte", 1024*1024 + 1},
{"9.9MB (almost 1 small block per shard)", 9*1024*1024 + 900*1024},
{"10.1MB (just over 1 small block per shard)", 10*1024*1024 + 100*1024},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
baseFileName := filepath.Join(tempDir, tt.name)
datFileName := baseFileName + ".dat"
// Create .dat file
datFile, err := os.Create(datFileName)
if err != nil {
t.Fatalf("Failed to create .dat file: %v", err)
}
// Write exactly the specified number of bytes
data := make([]byte, tt.datFileSize)
for i := range data {
data[i] = byte(i % 256)
}
datFile.Write(data)
datFile.Close()
// Calculate expected
expectedShardSize := calculateExpectedShardSize(tt.datFileSize)
// Run actual EC encoding
err = erasure_coding.WriteEcFiles(baseFileName)
if err != nil {
t.Fatalf("Failed to encode EC files: %v", err)
}
// Check first shard (all should be same size)
shardFileName := baseFileName + erasure_coding.ToExt(0)
shardInfo, err := os.Stat(shardFileName)
if err != nil {
t.Fatalf("Failed to stat shard file: %v", err)
}
actualShardSize := shardInfo.Size()
if actualShardSize != expectedShardSize {
t.Errorf("File size %d: expected shard %d, got %d (diff: %d)",
tt.datFileSize, expectedShardSize, actualShardSize, actualShardSize-expectedShardSize)
} else {
t.Logf("✓ File size %d → shard size %d (correct)", tt.datFileSize, actualShardSize)
}
// Cleanup
os.Remove(datFileName)
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
os.Remove(baseFileName + erasure_coding.ToExt(i))
}
})
}
}

195
weed/storage/disk_location_ec_shard_size_test.go

@ -0,0 +1,195 @@
package storage
import (
"testing"
)
func TestCalculateExpectedShardSize(t *testing.T) {
const (
largeBlock = 1024 * 1024 * 1024 // 1GB
smallBlock = 1024 * 1024 // 1MB
dataShards = 10
largeBatchSize = largeBlock * dataShards // 10GB
smallBatchSize = smallBlock * dataShards // 10MB
)
tests := []struct {
name string
datFileSize int64
expectedShardSize int64
description string
}{
// Edge case: empty file
{
name: "0 bytes (empty file)",
datFileSize: 0,
expectedShardSize: 0,
description: "Empty file has 0 shard size",
},
// Boundary tests: exact multiples of large block
{
name: "Exact 10GB (1 large batch)",
datFileSize: largeBatchSize, // 10GB = 1 large batch
expectedShardSize: largeBlock, // 1GB per shard
description: "Exactly fits in large blocks",
},
{
name: "Exact 20GB (2 large batches)",
datFileSize: 2 * largeBatchSize, // 20GB
expectedShardSize: 2 * largeBlock, // 2GB per shard
description: "2 complete large batches",
},
{
name: "Just under large batch (10GB - 1 byte)",
datFileSize: largeBatchSize - 1, // 10,737,418,239 bytes
expectedShardSize: 1024 * smallBlock, // 1024MB = 1GB (needs 1024 small blocks)
description: "Just under 10GB needs 1024 small blocks",
},
{
name: "Just over large batch (10GB + 1 byte)",
datFileSize: largeBatchSize + 1, // 10GB + 1 byte
expectedShardSize: largeBlock + smallBlock, // 1GB + 1MB
description: "Just over 10GB adds 1 small block",
},
// Boundary tests: exact multiples of small batch
{
name: "Exact 10MB (1 small batch)",
datFileSize: smallBatchSize, // 10MB
expectedShardSize: smallBlock, // 1MB per shard
description: "Exactly fits in 1 small batch",
},
{
name: "Exact 20MB (2 small batches)",
datFileSize: 2 * smallBatchSize, // 20MB
expectedShardSize: 2 * smallBlock, // 2MB per shard
description: "2 complete small batches",
},
{
name: "Just under small batch (10MB - 1 byte)",
datFileSize: smallBatchSize - 1, // 10MB - 1 byte
expectedShardSize: smallBlock, // Still needs 1MB per shard (rounds up)
description: "Just under 10MB rounds up to 1 small block",
},
{
name: "Just over small batch (10MB + 1 byte)",
datFileSize: smallBatchSize + 1, // 10MB + 1 byte
expectedShardSize: 2 * smallBlock, // 2MB per shard
description: "Just over 10MB needs 2 small blocks",
},
// Mixed: large batch + partial small batch
{
name: "10GB + 1MB",
datFileSize: largeBatchSize + 1*1024*1024, // 10GB + 1MB
expectedShardSize: largeBlock + smallBlock, // 1GB + 1MB
description: "1 large batch + 1MB needs 1 small block",
},
{
name: "10GB + 5MB",
datFileSize: largeBatchSize + 5*1024*1024, // 10GB + 5MB
expectedShardSize: largeBlock + smallBlock, // 1GB + 1MB
description: "1 large batch + 5MB rounds up to 1 small block",
},
{
name: "10GB + 15MB",
datFileSize: largeBatchSize + 15*1024*1024, // 10GB + 15MB
expectedShardSize: largeBlock + 2*smallBlock, // 1GB + 2MB
description: "1 large batch + 15MB needs 2 small blocks",
},
// Original test cases
{
name: "11GB (1 large batch + 103 small blocks)",
datFileSize: 11 * 1024 * 1024 * 1024, // 11GB
expectedShardSize: 1*1024*1024*1024 + 103*1024*1024, // 1GB + 103MB (103 small blocks for 1GB remaining)
description: "1GB large + 1GB remaining needs 103 small blocks",
},
{
name: "5MB (requires 1 small block per shard)",
datFileSize: 5 * 1024 * 1024, // 5MB
expectedShardSize: 1 * 1024 * 1024, // 1MB per shard (rounded up)
description: "Small file rounds up to 1MB per shard",
},
{
name: "1KB (minimum size)",
datFileSize: 1024,
expectedShardSize: 1 * 1024 * 1024, // 1MB per shard (1 small block)
description: "Tiny file needs 1 small block",
},
{
name: "10.5GB (mixed)",
datFileSize: 10*1024*1024*1024 + 512*1024*1024, // 10.5GB
expectedShardSize: 1*1024*1024*1024 + 52*1024*1024, // 1GB + 52MB (52 small blocks for 512MB remaining)
description: "1GB large + 512MB remaining needs 52 small blocks",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actualShardSize := calculateExpectedShardSize(tt.datFileSize)
if actualShardSize != tt.expectedShardSize {
t.Errorf("Expected shard size %d, got %d. %s",
tt.expectedShardSize, actualShardSize, tt.description)
}
t.Logf("✓ File size: %d → Shard size: %d (%s)",
tt.datFileSize, actualShardSize, tt.description)
})
}
}
// TestShardSizeValidationScenarios tests realistic scenarios
func TestShardSizeValidationScenarios(t *testing.T) {
scenarios := []struct {
name string
datFileSize int64
actualShardSize int64
shouldBeValid bool
}{
{
name: "Valid: exact match for 10GB",
datFileSize: 10 * 1024 * 1024 * 1024, // 10GB
actualShardSize: 1 * 1024 * 1024 * 1024, // 1GB (exact)
shouldBeValid: true,
},
{
name: "Invalid: 1 byte too small",
datFileSize: 10 * 1024 * 1024 * 1024, // 10GB
actualShardSize: 1*1024*1024*1024 - 1, // 1GB - 1 byte
shouldBeValid: false,
},
{
name: "Invalid: 1 byte too large",
datFileSize: 10 * 1024 * 1024 * 1024, // 10GB
actualShardSize: 1*1024*1024*1024 + 1, // 1GB + 1 byte
shouldBeValid: false,
},
{
name: "Valid: small file exact match",
datFileSize: 5 * 1024 * 1024, // 5MB
actualShardSize: 1 * 1024 * 1024, // 1MB (exact)
shouldBeValid: true,
},
{
name: "Invalid: wrong size for small file",
datFileSize: 5 * 1024 * 1024, // 5MB
actualShardSize: 500 * 1024, // 500KB (too small)
shouldBeValid: false,
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
expectedSize := calculateExpectedShardSize(scenario.datFileSize)
isValid := scenario.actualShardSize == expectedSize
if isValid != scenario.shouldBeValid {
t.Errorf("Expected validation result %v, got %v. Actual shard: %d, Expected: %d",
scenario.shouldBeValid, isValid, scenario.actualShardSize, expectedSize)
}
})
}
}

643
weed/storage/disk_location_ec_test.go

@ -0,0 +1,643 @@
package storage
import (
"os"
"path/filepath"
"testing"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// TestIncompleteEcEncodingCleanup tests the cleanup logic for incomplete EC encoding scenarios
func TestIncompleteEcEncodingCleanup(t *testing.T) {
tests := []struct {
name string
volumeId needle.VolumeId
collection string
createDatFile bool
createEcxFile bool
createEcjFile bool
numShards int
expectCleanup bool
expectLoadSuccess bool
}{
{
name: "Incomplete EC: shards without .ecx, .dat exists - should cleanup",
volumeId: 100,
collection: "",
createDatFile: true,
createEcxFile: false,
createEcjFile: false,
numShards: 14, // All shards but no .ecx
expectCleanup: true,
expectLoadSuccess: false,
},
{
name: "Distributed EC: shards without .ecx, .dat deleted - should NOT cleanup",
volumeId: 101,
collection: "",
createDatFile: false,
createEcxFile: false,
createEcjFile: false,
numShards: 5, // Partial shards, distributed
expectCleanup: false,
expectLoadSuccess: false,
},
{
name: "Incomplete EC: shards with .ecx but < 10 shards, .dat exists - should cleanup",
volumeId: 102,
collection: "",
createDatFile: true,
createEcxFile: true,
createEcjFile: false,
numShards: 7, // Less than DataShardsCount (10)
expectCleanup: true,
expectLoadSuccess: false,
},
{
name: "Valid local EC: shards with .ecx, >= 10 shards, .dat exists - should load",
volumeId: 103,
collection: "",
createDatFile: true,
createEcxFile: true,
createEcjFile: false,
numShards: 14, // All shards
expectCleanup: false,
expectLoadSuccess: true, // Would succeed if .ecx was valid
},
{
name: "Distributed EC: shards with .ecx, .dat deleted - should load",
volumeId: 104,
collection: "",
createDatFile: false,
createEcxFile: true,
createEcjFile: false,
numShards: 10, // Enough shards
expectCleanup: false,
expectLoadSuccess: true, // Would succeed if .ecx was valid
},
{
name: "Incomplete EC with collection: shards without .ecx, .dat exists - should cleanup",
volumeId: 105,
collection: "test_collection",
createDatFile: true,
createEcxFile: false,
createEcjFile: false,
numShards: 14,
expectCleanup: true,
expectLoadSuccess: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Use per-subtest temp directory for stronger isolation
tempDir := t.TempDir()
// Create DiskLocation
minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"}
diskLocation := &DiskLocation{
Directory: tempDir,
DirectoryUuid: "test-uuid",
IdxDirectory: tempDir,
DiskType: types.HddType,
MaxVolumeCount: 100,
OriginalMaxVolumeCount: 100,
MinFreeSpace: minFreeSpace,
}
diskLocation.volumes = make(map[needle.VolumeId]*Volume)
diskLocation.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume)
// Setup test files
baseFileName := erasure_coding.EcShardFileName(tt.collection, tempDir, int(tt.volumeId))
// Use deterministic but small size: 10MB .dat => 1MB per shard
datFileSize := int64(10 * 1024 * 1024) // 10MB
expectedShardSize := calculateExpectedShardSize(datFileSize)
// Create .dat file if needed
if tt.createDatFile {
datFile, err := os.Create(baseFileName + ".dat")
if err != nil {
t.Fatalf("Failed to create .dat file: %v", err)
}
if err := datFile.Truncate(datFileSize); err != nil {
t.Fatalf("Failed to truncate .dat file: %v", err)
}
if err := datFile.Close(); err != nil {
t.Fatalf("Failed to close .dat file: %v", err)
}
}
// Create EC shard files
for i := 0; i < tt.numShards; i++ {
shardFile, err := os.Create(baseFileName + erasure_coding.ToExt(i))
if err != nil {
t.Fatalf("Failed to create shard file: %v", err)
}
if err := shardFile.Truncate(expectedShardSize); err != nil {
t.Fatalf("Failed to truncate shard file: %v", err)
}
if err := shardFile.Close(); err != nil {
t.Fatalf("Failed to close shard file: %v", err)
}
}
// Create .ecx file if needed
if tt.createEcxFile {
ecxFile, err := os.Create(baseFileName + ".ecx")
if err != nil {
t.Fatalf("Failed to create .ecx file: %v", err)
}
if _, err := ecxFile.WriteString("dummy ecx data"); err != nil {
ecxFile.Close()
t.Fatalf("Failed to write .ecx file: %v", err)
}
if err := ecxFile.Close(); err != nil {
t.Fatalf("Failed to close .ecx file: %v", err)
}
}
// Create .ecj file if needed
if tt.createEcjFile {
ecjFile, err := os.Create(baseFileName + ".ecj")
if err != nil {
t.Fatalf("Failed to create .ecj file: %v", err)
}
if _, err := ecjFile.WriteString("dummy ecj data"); err != nil {
ecjFile.Close()
t.Fatalf("Failed to write .ecj file: %v", err)
}
if err := ecjFile.Close(); err != nil {
t.Fatalf("Failed to close .ecj file: %v", err)
}
}
// Run loadAllEcShards
loadErr := diskLocation.loadAllEcShards()
if loadErr != nil {
t.Logf("loadAllEcShards returned error (expected in some cases): %v", loadErr)
}
// Test idempotency - running again should not cause issues
loadErr2 := diskLocation.loadAllEcShards()
if loadErr2 != nil {
t.Logf("Second loadAllEcShards returned error: %v", loadErr2)
}
// Verify cleanup expectations
if tt.expectCleanup {
// Check that files were cleaned up
if util.FileExists(baseFileName + ".ecx") {
t.Errorf("Expected .ecx to be cleaned up but it still exists")
}
if util.FileExists(baseFileName + ".ecj") {
t.Errorf("Expected .ecj to be cleaned up but it still exists")
}
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
shardFile := baseFileName + erasure_coding.ToExt(i)
if util.FileExists(shardFile) {
t.Errorf("Expected shard %d to be cleaned up but it still exists", i)
}
}
// .dat file should still exist (not cleaned up)
if tt.createDatFile && !util.FileExists(baseFileName+".dat") {
t.Errorf("Expected .dat file to remain but it was deleted")
}
} else {
// Check that files were NOT cleaned up
for i := 0; i < tt.numShards; i++ {
shardFile := baseFileName + erasure_coding.ToExt(i)
if !util.FileExists(shardFile) {
t.Errorf("Expected shard %d to remain but it was cleaned up", i)
}
}
if tt.createEcxFile && !util.FileExists(baseFileName+".ecx") {
t.Errorf("Expected .ecx to remain but it was cleaned up")
}
}
// Verify load expectations
if tt.expectLoadSuccess {
if diskLocation.EcShardCount() == 0 {
t.Errorf("Expected EC shards to be loaded for volume %d", tt.volumeId)
}
}
})
}
}
// TestValidateEcVolume tests the validateEcVolume function
func TestValidateEcVolume(t *testing.T) {
tempDir := t.TempDir()
minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"}
diskLocation := &DiskLocation{
Directory: tempDir,
DirectoryUuid: "test-uuid",
IdxDirectory: tempDir,
DiskType: types.HddType,
MinFreeSpace: minFreeSpace,
}
tests := []struct {
name string
volumeId needle.VolumeId
collection string
createDatFile bool
numShards int
expectValid bool
}{
{
name: "Valid: .dat exists with 10+ shards",
volumeId: 200,
collection: "",
createDatFile: true,
numShards: 10,
expectValid: true,
},
{
name: "Invalid: .dat exists with < 10 shards",
volumeId: 201,
collection: "",
createDatFile: true,
numShards: 9,
expectValid: false,
},
{
name: "Valid: .dat deleted (distributed EC) with any shards",
volumeId: 202,
collection: "",
createDatFile: false,
numShards: 5,
expectValid: true,
},
{
name: "Valid: .dat deleted (distributed EC) with no shards",
volumeId: 203,
collection: "",
createDatFile: false,
numShards: 0,
expectValid: true,
},
{
name: "Invalid: zero-byte shard files should not count",
volumeId: 204,
collection: "",
createDatFile: true,
numShards: 0, // Will create 10 zero-byte files below
expectValid: false,
},
{
name: "Invalid: .dat exists with different size shards",
volumeId: 205,
collection: "",
createDatFile: true,
numShards: 10, // Will create shards with varying sizes
expectValid: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
baseFileName := erasure_coding.EcShardFileName(tt.collection, tempDir, int(tt.volumeId))
// For proper testing, we need to use realistic sizes that match EC encoding
// EC uses large blocks (1GB) and small blocks (1MB)
// For test purposes, use a small .dat file size that still exercises the logic
// 10MB .dat file = 1MB per shard (one small batch, fast and deterministic)
datFileSize := int64(10 * 1024 * 1024) // 10MB
expectedShardSize := calculateExpectedShardSize(datFileSize)
// Create .dat file if needed
if tt.createDatFile {
datFile, err := os.Create(baseFileName + ".dat")
if err != nil {
t.Fatalf("Failed to create .dat file: %v", err)
}
// Write minimal data (don't need to fill entire 10GB for tests)
datFile.Truncate(datFileSize)
datFile.Close()
}
// Create EC shard files with correct size
for i := 0; i < tt.numShards; i++ {
shardFile, err := os.Create(baseFileName + erasure_coding.ToExt(i))
if err != nil {
t.Fatalf("Failed to create shard file: %v", err)
}
// Use truncate to create file of correct size without allocating all the space
if err := shardFile.Truncate(expectedShardSize); err != nil {
shardFile.Close()
t.Fatalf("Failed to truncate shard file: %v", err)
}
if err := shardFile.Close(); err != nil {
t.Fatalf("Failed to close shard file: %v", err)
}
}
// For zero-byte test case, create empty files for all data shards
if tt.volumeId == 204 {
for i := 0; i < erasure_coding.DataShardsCount; i++ {
shardFile, err := os.Create(baseFileName + erasure_coding.ToExt(i))
if err != nil {
t.Fatalf("Failed to create empty shard file: %v", err)
}
// Don't write anything - leave as zero-byte
shardFile.Close()
}
}
// For mismatched shard size test case, create shards with different sizes
if tt.volumeId == 205 {
for i := 0; i < erasure_coding.DataShardsCount; i++ {
shardFile, err := os.Create(baseFileName + erasure_coding.ToExt(i))
if err != nil {
t.Fatalf("Failed to create shard file: %v", err)
}
// Write different amount of data to each shard
data := make([]byte, 100+i*10)
shardFile.Write(data)
shardFile.Close()
}
}
// Test validation
isValid := diskLocation.validateEcVolume(tt.collection, tt.volumeId)
if isValid != tt.expectValid {
t.Errorf("Expected validation result %v but got %v", tt.expectValid, isValid)
}
})
}
}
// TestRemoveEcVolumeFiles tests the removeEcVolumeFiles function
func TestRemoveEcVolumeFiles(t *testing.T) {
tests := []struct {
name string
separateIdxDir bool
}{
{"Same directory for data and index", false},
{"Separate idx directory", true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tempDir := t.TempDir()
var dataDir, idxDir string
if tt.separateIdxDir {
dataDir = filepath.Join(tempDir, "data")
idxDir = filepath.Join(tempDir, "idx")
os.MkdirAll(dataDir, 0755)
os.MkdirAll(idxDir, 0755)
} else {
dataDir = tempDir
idxDir = tempDir
}
minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"}
diskLocation := &DiskLocation{
Directory: dataDir,
DirectoryUuid: "test-uuid",
IdxDirectory: idxDir,
DiskType: types.HddType,
MinFreeSpace: minFreeSpace,
}
volumeId := needle.VolumeId(300)
collection := ""
dataBaseFileName := erasure_coding.EcShardFileName(collection, dataDir, int(volumeId))
idxBaseFileName := erasure_coding.EcShardFileName(collection, idxDir, int(volumeId))
// Create all EC shard files in data directory
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
shardFile, err := os.Create(dataBaseFileName + erasure_coding.ToExt(i))
if err != nil {
t.Fatalf("Failed to create shard file: %v", err)
}
if _, err := shardFile.WriteString("dummy shard data"); err != nil {
shardFile.Close()
t.Fatalf("Failed to write shard file: %v", err)
}
if err := shardFile.Close(); err != nil {
t.Fatalf("Failed to close shard file: %v", err)
}
}
// Create .ecx file in idx directory
ecxFile, err := os.Create(idxBaseFileName + ".ecx")
if err != nil {
t.Fatalf("Failed to create .ecx file: %v", err)
}
if _, err := ecxFile.WriteString("dummy ecx data"); err != nil {
ecxFile.Close()
t.Fatalf("Failed to write .ecx file: %v", err)
}
if err := ecxFile.Close(); err != nil {
t.Fatalf("Failed to close .ecx file: %v", err)
}
// Create .ecj file in idx directory
ecjFile, err := os.Create(idxBaseFileName + ".ecj")
if err != nil {
t.Fatalf("Failed to create .ecj file: %v", err)
}
if _, err := ecjFile.WriteString("dummy ecj data"); err != nil {
ecjFile.Close()
t.Fatalf("Failed to write .ecj file: %v", err)
}
if err := ecjFile.Close(); err != nil {
t.Fatalf("Failed to close .ecj file: %v", err)
}
// Create .dat file in data directory (should NOT be removed)
datFile, err := os.Create(dataBaseFileName + ".dat")
if err != nil {
t.Fatalf("Failed to create .dat file: %v", err)
}
if _, err := datFile.WriteString("dummy dat data"); err != nil {
datFile.Close()
t.Fatalf("Failed to write .dat file: %v", err)
}
if err := datFile.Close(); err != nil {
t.Fatalf("Failed to close .dat file: %v", err)
}
// Call removeEcVolumeFiles
diskLocation.removeEcVolumeFiles(collection, volumeId)
// Verify all EC shard files are removed from data directory
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
shardFile := dataBaseFileName + erasure_coding.ToExt(i)
if util.FileExists(shardFile) {
t.Errorf("Shard file %d should be removed but still exists", i)
}
}
// Verify .ecx file is removed from idx directory
if util.FileExists(idxBaseFileName + ".ecx") {
t.Errorf(".ecx file should be removed but still exists")
}
// Verify .ecj file is removed from idx directory
if util.FileExists(idxBaseFileName + ".ecj") {
t.Errorf(".ecj file should be removed but still exists")
}
// Verify .dat file is NOT removed from data directory
if !util.FileExists(dataBaseFileName + ".dat") {
t.Errorf(".dat file should NOT be removed but was deleted")
}
})
}
}
// TestEcCleanupWithSeparateIdxDirectory tests EC cleanup when idx directory is different
func TestEcCleanupWithSeparateIdxDirectory(t *testing.T) {
tempDir := t.TempDir()
idxDir := filepath.Join(tempDir, "idx")
dataDir := filepath.Join(tempDir, "data")
os.MkdirAll(idxDir, 0755)
os.MkdirAll(dataDir, 0755)
minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"}
diskLocation := &DiskLocation{
Directory: dataDir,
DirectoryUuid: "test-uuid",
IdxDirectory: idxDir,
DiskType: types.HddType,
MinFreeSpace: minFreeSpace,
}
diskLocation.volumes = make(map[needle.VolumeId]*Volume)
diskLocation.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume)
volumeId := needle.VolumeId(400)
collection := ""
// Create shards in data directory (shards only go to Directory, not IdxDirectory)
dataBaseFileName := erasure_coding.EcShardFileName(collection, dataDir, int(volumeId))
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
shardFile, err := os.Create(dataBaseFileName + erasure_coding.ToExt(i))
if err != nil {
t.Fatalf("Failed to create shard file: %v", err)
}
if _, err := shardFile.WriteString("dummy shard data"); err != nil {
t.Fatalf("Failed to write shard file: %v", err)
}
if err := shardFile.Close(); err != nil {
t.Fatalf("Failed to close shard file: %v", err)
}
}
// Create .dat in data directory
datFile, err := os.Create(dataBaseFileName + ".dat")
if err != nil {
t.Fatalf("Failed to create .dat file: %v", err)
}
if _, err := datFile.WriteString("dummy data"); err != nil {
t.Fatalf("Failed to write .dat file: %v", err)
}
if err := datFile.Close(); err != nil {
t.Fatalf("Failed to close .dat file: %v", err)
}
// Do not create .ecx: trigger orphaned-shards cleanup when .dat exists
// Run loadAllEcShards
loadErr := diskLocation.loadAllEcShards()
if loadErr != nil {
t.Logf("loadAllEcShards error: %v", loadErr)
}
// Verify cleanup occurred in data directory (shards)
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
shardFile := dataBaseFileName + erasure_coding.ToExt(i)
if util.FileExists(shardFile) {
t.Errorf("Shard file %d should be cleaned up but still exists", i)
}
}
// Verify .dat in data directory still exists (only EC files are cleaned up)
if !util.FileExists(dataBaseFileName + ".dat") {
t.Errorf(".dat file should remain but was deleted")
}
}
// TestDistributedEcVolumeNoFileDeletion verifies that distributed EC volumes
// (where .dat is deleted) do NOT have their shard files deleted when load fails
// This tests the critical bug fix where DestroyEcVolume was incorrectly deleting files
func TestDistributedEcVolumeNoFileDeletion(t *testing.T) {
tempDir := t.TempDir()
minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"}
diskLocation := &DiskLocation{
Directory: tempDir,
DirectoryUuid: "test-uuid",
IdxDirectory: tempDir,
DiskType: types.HddType,
MinFreeSpace: minFreeSpace,
ecVolumes: make(map[needle.VolumeId]*erasure_coding.EcVolume),
}
collection := ""
volumeId := needle.VolumeId(500)
baseFileName := erasure_coding.EcShardFileName(collection, tempDir, int(volumeId))
// Create EC shards (only 5 shards - less than DataShardsCount, but OK for distributed EC)
numDistributedShards := 5
for i := 0; i < numDistributedShards; i++ {
shardFile, err := os.Create(baseFileName + erasure_coding.ToExt(i))
if err != nil {
t.Fatalf("Failed to create shard file: %v", err)
}
if _, err := shardFile.WriteString("dummy shard data"); err != nil {
shardFile.Close()
t.Fatalf("Failed to write shard file: %v", err)
}
if err := shardFile.Close(); err != nil {
t.Fatalf("Failed to close shard file: %v", err)
}
}
// Create .ecx file to trigger EC loading
ecxFile, err := os.Create(baseFileName + ".ecx")
if err != nil {
t.Fatalf("Failed to create .ecx file: %v", err)
}
if _, err := ecxFile.WriteString("dummy ecx data"); err != nil {
ecxFile.Close()
t.Fatalf("Failed to write .ecx file: %v", err)
}
if err := ecxFile.Close(); err != nil {
t.Fatalf("Failed to close .ecx file: %v", err)
}
// NO .dat file - this is a distributed EC volume
// Run loadAllEcShards - this should fail but NOT delete shard files
loadErr := diskLocation.loadAllEcShards()
if loadErr != nil {
t.Logf("loadAllEcShards returned error (expected): %v", loadErr)
}
// CRITICAL CHECK: Verify shard files still exist (should NOT be deleted)
for i := 0; i < 5; i++ {
shardFile := baseFileName + erasure_coding.ToExt(i)
if !util.FileExists(shardFile) {
t.Errorf("CRITICAL BUG: Shard file %s was deleted for distributed EC volume!", shardFile)
}
}
// Verify .ecx file still exists (should NOT be deleted for distributed EC)
if !util.FileExists(baseFileName + ".ecx") {
t.Errorf("CRITICAL BUG: .ecx file was deleted for distributed EC volume!")
}
t.Logf("SUCCESS: Distributed EC volume files preserved (not deleted)")
}
Loading…
Cancel
Save