|
|
|
@ -197,6 +197,8 @@ func (l *DiskLocation) loadAllEcShards() (err error) { |
|
|
|
if prevVolumeId == 0 || (volumeId == prevVolumeId && collection == prevCollection) { |
|
|
|
sameVolumeShards = append(sameVolumeShards, fileInfo.Name()) |
|
|
|
} else { |
|
|
|
// Before starting a new group, check if previous group had orphaned shards
|
|
|
|
l.checkOrphanedShards(sameVolumeShards, prevVolumeId) |
|
|
|
sameVolumeShards = []string{fileInfo.Name()} |
|
|
|
} |
|
|
|
prevVolumeId = volumeId |
|
|
|
@ -251,16 +253,9 @@ func (l *DiskLocation) loadAllEcShards() (err error) { |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// Check for orphaned EC shards without .ecx file (incomplete EC encoding)
|
|
|
|
// This happens when encoding is interrupted after writing shards but before writing .ecx
|
|
|
|
if len(sameVolumeShards) > 0 && prevVolumeId != 0 { |
|
|
|
// We have collected EC shards but never found .ecx file
|
|
|
|
// Need to determine the collection name from the shard filenames
|
|
|
|
baseName := sameVolumeShards[0][:len(sameVolumeShards[0])-len(path.Ext(sameVolumeShards[0]))] |
|
|
|
if collection, volumeId, err := parseCollectionVolumeId(baseName); err == nil && volumeId == prevVolumeId { |
|
|
|
l.cleanupIfIncomplete(collection, volumeId, len(sameVolumeShards)) |
|
|
|
} |
|
|
|
} |
|
|
|
// Check for orphaned EC shards without .ecx file at the end of the directory scan
|
|
|
|
// This handles the last group of shards in the directory
|
|
|
|
l.checkOrphanedShards(sameVolumeShards, prevVolumeId) |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
@ -304,18 +299,34 @@ func (l *DiskLocation) EcShardCount() int { |
|
|
|
return shardCount |
|
|
|
} |
|
|
|
|
|
|
|
// cleanupIfIncomplete removes EC files when .dat exists but .ecx is missing for a volume.
|
|
|
|
// checkOrphanedShards checks if the given shards are orphaned (no .ecx file) and cleans them up if needed.
|
|
|
|
// Returns true if orphaned shards were found and cleaned up.
|
|
|
|
// This handles the case where EC encoding was interrupted before creating the .ecx file.
|
|
|
|
func (l *DiskLocation) cleanupIfIncomplete(collection string, vid needle.VolumeId, shardCount int) { |
|
|
|
baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(vid)) |
|
|
|
func (l *DiskLocation) checkOrphanedShards(shards []string, prevVolumeId needle.VolumeId) bool { |
|
|
|
if len(shards) == 0 || prevVolumeId == 0 { |
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
// Parse collection and volumeId from the first shard filename
|
|
|
|
baseName := shards[0][:len(shards[0])-len(path.Ext(shards[0]))] |
|
|
|
collection, volumeId, err := parseCollectionVolumeId(baseName) |
|
|
|
if err != nil || volumeId != prevVolumeId { |
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
// Check if .dat file exists (incomplete encoding, not distributed EC)
|
|
|
|
// If .dat file exists, this is not a distributed EC volume, so cleanup the orphaned shards
|
|
|
|
baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(volumeId)) |
|
|
|
datFileName := baseFileName + ".dat" |
|
|
|
if util.FileExists(datFileName) { |
|
|
|
glog.Warningf("Found %d EC shards without .ecx file for volume %d (incomplete encoding interrupted before .ecx creation), cleaning up...", |
|
|
|
shardCount, vid) |
|
|
|
l.removeEcVolumeFiles(collection, vid) |
|
|
|
len(shards), volumeId) |
|
|
|
l.removeEcVolumeFiles(collection, volumeId) |
|
|
|
// Clean up any in-memory state. This does not delete files (already deleted by removeEcVolumeFiles).
|
|
|
|
l.unloadEcVolume(vid) |
|
|
|
l.unloadEcVolume(volumeId) |
|
|
|
return true |
|
|
|
} |
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
// calculateExpectedShardSize computes the exact expected shard size based on .dat file size
|
|
|
|
|