|
|
|
@ -172,9 +172,18 @@ func (l *DiskLocation) loadAllEcShards() (err error) { |
|
|
|
slices.SortFunc(dirEntries, func(a, b os.DirEntry) int { |
|
|
|
return strings.Compare(a.Name(), b.Name()) |
|
|
|
}) |
|
|
|
|
|
|
|
var sameVolumeShards []string |
|
|
|
var prevVolumeId needle.VolumeId |
|
|
|
var prevCollection string |
|
|
|
|
|
|
|
// Helper to reset state between volume processing
|
|
|
|
reset := func() { |
|
|
|
sameVolumeShards = nil |
|
|
|
prevVolumeId = 0 |
|
|
|
prevCollection = "" |
|
|
|
} |
|
|
|
|
|
|
|
for _, fileInfo := range dirEntries { |
|
|
|
if fileInfo.IsDir() { |
|
|
|
continue |
|
|
|
@ -216,22 +225,9 @@ func (l *DiskLocation) loadAllEcShards() (err error) { |
|
|
|
// If .dat file is gone, this is likely a distributed EC volume with shards on multiple servers
|
|
|
|
baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(volumeId)) |
|
|
|
datFileName := baseFileName + ".dat" |
|
|
|
// Helper to reset state between volume processing
|
|
|
|
reset := func() { |
|
|
|
sameVolumeShards = nil |
|
|
|
prevVolumeId = 0 |
|
|
|
prevCollection = "" |
|
|
|
} |
|
|
|
|
|
|
|
// Determine .dat presence robustly; unexpected errors are treated as "exists"
|
|
|
|
datExists := false |
|
|
|
if _, err := os.Stat(datFileName); err == nil { |
|
|
|
datExists = true |
|
|
|
} else if !os.IsNotExist(err) { |
|
|
|
glog.Warningf("Failed to stat .dat file %s: %v", datFileName, err) |
|
|
|
// Safer to assume local .dat exists to avoid misclassifying as distributed EC
|
|
|
|
datExists = true |
|
|
|
} |
|
|
|
datExists := l.checkDatFileExists(datFileName) |
|
|
|
|
|
|
|
// Validate EC volume if .dat file exists (incomplete EC encoding scenario)
|
|
|
|
// This checks shard count, shard size consistency, and expected size vs .dat file
|
|
|
|
@ -311,6 +307,20 @@ func (l *DiskLocation) EcShardCount() int { |
|
|
|
return shardCount |
|
|
|
} |
|
|
|
|
|
|
|
// checkDatFileExists checks if .dat file exists with robust error handling.
|
|
|
|
// Unexpected errors (permission, I/O) are treated as "exists" to avoid misclassifying
|
|
|
|
// local EC as distributed EC, which is the safer fallback.
|
|
|
|
func (l *DiskLocation) checkDatFileExists(datFileName string) bool { |
|
|
|
if _, err := os.Stat(datFileName); err == nil { |
|
|
|
return true |
|
|
|
} else if !os.IsNotExist(err) { |
|
|
|
glog.Warningf("Failed to stat .dat file %s: %v", datFileName, err) |
|
|
|
// Safer to assume local .dat exists to avoid misclassifying as distributed EC
|
|
|
|
return true |
|
|
|
} |
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
// checkOrphanedShards checks if the given shards are orphaned (no .ecx file) and cleans them up if needed.
|
|
|
|
// Returns true if orphaned shards were found and cleaned up.
|
|
|
|
// This handles the case where EC encoding was interrupted before creating the .ecx file.
|
|
|
|
@ -320,20 +330,10 @@ func (l *DiskLocation) checkOrphanedShards(shards []string, collection string, v |
|
|
|
} |
|
|
|
|
|
|
|
// Check if .dat file exists (incomplete encoding, not distributed EC)
|
|
|
|
// Use os.Stat for robust error handling; unexpected errors treated as "exists"
|
|
|
|
baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(volumeId)) |
|
|
|
datFileName := baseFileName + ".dat" |
|
|
|
|
|
|
|
datExists := false |
|
|
|
if _, err := os.Stat(datFileName); err == nil { |
|
|
|
datExists = true |
|
|
|
} else if !os.IsNotExist(err) { |
|
|
|
glog.Warningf("Failed to stat .dat file %s: %v", datFileName, err) |
|
|
|
// Safer to assume local .dat exists to avoid misclassifying as distributed EC
|
|
|
|
datExists = true |
|
|
|
} |
|
|
|
|
|
|
|
if datExists { |
|
|
|
if l.checkDatFileExists(datFileName) { |
|
|
|
glog.Warningf("Found %d EC shards without .ecx file for volume %d (incomplete encoding interrupted before .ecx creation), cleaning up...", |
|
|
|
len(shards), volumeId) |
|
|
|
l.removeEcVolumeFiles(collection, volumeId) |
|
|
|
|