|
|
@ -208,29 +208,29 @@ func (l *DiskLocation) loadAllEcShards() (err error) { |
|
|
continue |
|
|
continue |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if err = l.loadEcShards(sameVolumeShards, collection, volumeId); err != nil { |
|
|
|
|
|
// If EC shards failed to load and .dat still exists, clean up EC files to allow .dat file to be used
|
|
|
|
|
|
// If .dat is gone, log error but don't clean up (may be waiting for shards from other servers)
|
|
|
|
|
|
if datExists { |
|
|
|
|
|
glog.Warningf("Failed to load EC shards for volume %d and .dat exists: %v, cleaning up EC files to use .dat...", volumeId, err) |
|
|
|
|
|
// Clean up any partially loaded in-memory state before removing files
|
|
|
|
|
|
l.DestroyEcVolume(volumeId) |
|
|
|
|
|
l.removeEcVolumeFiles(collection, volumeId) |
|
|
|
|
|
} else { |
|
|
|
|
|
glog.Warningf("Failed to load EC shards for volume %d: %v (this may be normal for distributed EC volumes)", volumeId, err) |
|
|
|
|
|
// Clean up any partially loaded in-memory state even if we don't remove files
|
|
|
|
|
|
l.DestroyEcVolume(volumeId) |
|
|
|
|
|
|
|
|
if err = l.loadEcShards(sameVolumeShards, collection, volumeId); err != nil { |
|
|
|
|
|
// If EC shards failed to load and .dat still exists, clean up EC files to allow .dat file to be used
|
|
|
|
|
|
// If .dat is gone, log error but don't clean up (may be waiting for shards from other servers)
|
|
|
|
|
|
if datExists { |
|
|
|
|
|
glog.Warningf("Failed to load EC shards for volume %d and .dat exists: %v, cleaning up EC files to use .dat...", volumeId, err) |
|
|
|
|
|
// Clean up any partially loaded in-memory state before removing files
|
|
|
|
|
|
l.DestroyEcVolume(volumeId) |
|
|
|
|
|
l.removeEcVolumeFiles(collection, volumeId) |
|
|
|
|
|
} else { |
|
|
|
|
|
glog.Warningf("Failed to load EC shards for volume %d: %v (this may be normal for distributed EC volumes)", volumeId, err) |
|
|
|
|
|
// Clean up any partially loaded in-memory state even if we don't remove files
|
|
|
|
|
|
l.DestroyEcVolume(volumeId) |
|
|
|
|
|
} |
|
|
|
|
|
sameVolumeShards = nil |
|
|
|
|
|
prevVolumeId = 0 |
|
|
|
|
|
continue |
|
|
} |
|
|
} |
|
|
|
|
|
prevVolumeId = volumeId |
|
|
sameVolumeShards = nil |
|
|
sameVolumeShards = nil |
|
|
prevVolumeId = 0 |
|
|
|
|
|
continue |
|
|
continue |
|
|
} |
|
|
} |
|
|
prevVolumeId = volumeId |
|
|
|
|
|
sameVolumeShards = nil |
|
|
|
|
|
continue |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// Check for orphaned EC shards without .ecx file (incomplete EC encoding)
|
|
|
// Check for orphaned EC shards without .ecx file (incomplete EC encoding)
|
|
|
// This happens when encoding is interrupted after writing shards but before writing .ecx
|
|
|
// This happens when encoding is interrupted after writing shards but before writing .ecx
|
|
|
@ -305,11 +305,13 @@ func (l *DiskLocation) validateEcVolume(collection string, vid needle.VolumeId) |
|
|
// Count existing EC shard files
|
|
|
// Count existing EC shard files
|
|
|
for i := 0; i < erasure_coding.TotalShardsCount; i++ { |
|
|
for i := 0; i < erasure_coding.TotalShardsCount; i++ { |
|
|
shardFileName := baseFileName + erasure_coding.ToExt(i) |
|
|
shardFileName := baseFileName + erasure_coding.ToExt(i) |
|
|
if util.FileExists(shardFileName) { |
|
|
|
|
|
|
|
|
if fi, err := os.Stat(shardFileName); err == nil { |
|
|
// Check if file has non-zero size
|
|
|
// Check if file has non-zero size
|
|
|
if fi, err := os.Stat(shardFileName); err == nil && fi.Size() > 0 { |
|
|
|
|
|
|
|
|
if fi.Size() > 0 { |
|
|
shardCount++ |
|
|
shardCount++ |
|
|
} |
|
|
} |
|
|
|
|
|
} else if !os.IsNotExist(err) { |
|
|
|
|
|
glog.Warningf("Failed to stat shard file %s: %v", shardFileName, err) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@ -343,7 +345,7 @@ func (l *DiskLocation) removeEcVolumeFiles(collection string, vid needle.VolumeI |
|
|
glog.Warningf("Failed to remove incomplete EC shard file %s: %v", shardFileName, err) |
|
|
glog.Warningf("Failed to remove incomplete EC shard file %s: %v", shardFileName, err) |
|
|
} |
|
|
} |
|
|
} else { |
|
|
} else { |
|
|
glog.V(0).Infof("Removed incomplete EC shard file: %s", shardFileName) |
|
|
|
|
|
|
|
|
glog.V(2).Infof("Removed incomplete EC shard file: %s", shardFileName) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@ -353,13 +355,13 @@ func (l *DiskLocation) removeEcVolumeFiles(collection string, vid needle.VolumeI |
|
|
glog.Warningf("Failed to remove incomplete EC index file %s.ecx: %v", indexBaseFileName, err) |
|
|
glog.Warningf("Failed to remove incomplete EC index file %s.ecx: %v", indexBaseFileName, err) |
|
|
} |
|
|
} |
|
|
} else { |
|
|
} else { |
|
|
glog.V(0).Infof("Removed incomplete EC index file: %s.ecx", indexBaseFileName) |
|
|
|
|
|
|
|
|
glog.V(2).Infof("Removed incomplete EC index file: %s.ecx", indexBaseFileName) |
|
|
} |
|
|
} |
|
|
if err := os.Remove(indexBaseFileName + ".ecj"); err != nil { |
|
|
if err := os.Remove(indexBaseFileName + ".ecj"); err != nil { |
|
|
if !os.IsNotExist(err) { |
|
|
if !os.IsNotExist(err) { |
|
|
glog.Warningf("Failed to remove incomplete EC journal file %s.ecj: %v", indexBaseFileName, err) |
|
|
glog.Warningf("Failed to remove incomplete EC journal file %s.ecj: %v", indexBaseFileName, err) |
|
|
} |
|
|
} |
|
|
} else { |
|
|
} else { |
|
|
glog.V(0).Infof("Removed incomplete EC journal file: %s.ecj", indexBaseFileName) |
|
|
|
|
|
|
|
|
glog.V(2).Infof("Removed incomplete EC journal file: %s.ecj", indexBaseFileName) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |