Browse Source

Eliminated Redundant Parsing in checkOrphanedShards

pull/7384/head
chrislu 1 month ago
parent
commit
d07b24e42c
  1. 18
      weed/storage/disk_location_ec.go

18
weed/storage/disk_location_ec.go

@ -198,7 +198,7 @@ func (l *DiskLocation) loadAllEcShards() (err error) {
sameVolumeShards = append(sameVolumeShards, fileInfo.Name()) sameVolumeShards = append(sameVolumeShards, fileInfo.Name())
} else { } else {
// Before starting a new group, check if previous group had orphaned shards // Before starting a new group, check if previous group had orphaned shards
l.checkOrphanedShards(sameVolumeShards, prevVolumeId)
l.checkOrphanedShards(sameVolumeShards, prevCollection, prevVolumeId)
sameVolumeShards = []string{fileInfo.Name()} sameVolumeShards = []string{fileInfo.Name()}
} }
prevVolumeId = volumeId prevVolumeId = volumeId
@ -255,7 +255,7 @@ func (l *DiskLocation) loadAllEcShards() (err error) {
// Check for orphaned EC shards without .ecx file at the end of the directory scan // Check for orphaned EC shards without .ecx file at the end of the directory scan
// This handles the last group of shards in the directory // This handles the last group of shards in the directory
l.checkOrphanedShards(sameVolumeShards, prevVolumeId)
l.checkOrphanedShards(sameVolumeShards, prevCollection, prevVolumeId)
return nil return nil
} }
@ -302,15 +302,8 @@ func (l *DiskLocation) EcShardCount() int {
// checkOrphanedShards checks if the given shards are orphaned (no .ecx file) and cleans them up if needed. // checkOrphanedShards checks if the given shards are orphaned (no .ecx file) and cleans them up if needed.
// Returns true if orphaned shards were found and cleaned up. // Returns true if orphaned shards were found and cleaned up.
// This handles the case where EC encoding was interrupted before creating the .ecx file. // This handles the case where EC encoding was interrupted before creating the .ecx file.
func (l *DiskLocation) checkOrphanedShards(shards []string, prevVolumeId needle.VolumeId) bool {
if len(shards) == 0 || prevVolumeId == 0 {
return false
}
// Parse collection and volumeId from the first shard filename
baseName := shards[0][:len(shards[0])-len(path.Ext(shards[0]))]
collection, volumeId, err := parseCollectionVolumeId(baseName)
if err != nil || volumeId != prevVolumeId {
func (l *DiskLocation) checkOrphanedShards(shards []string, collection string, volumeId needle.VolumeId) bool {
if len(shards) == 0 || volumeId == 0 {
return false return false
} }
@ -399,7 +392,10 @@ func (l *DiskLocation) validateEcVolume(collection string, vid needle.VolumeId)
shardCount++ shardCount++
} }
} else if !os.IsNotExist(err) { } else if !os.IsNotExist(err) {
// If stat fails with unexpected error (permission, I/O), fail validation
// This is consistent with .dat file error handling
glog.Warningf("Failed to stat shard file %s: %v", shardFileName, err) glog.Warningf("Failed to stat shard file %s: %v", shardFileName, err)
return false
} }
} }

Loading…
Cancel
Save