|
|
|
@ -13,7 +13,6 @@ import ( |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/util" |
|
|
|
) |
|
|
|
|
|
|
|
var ( |
|
|
|
@ -224,7 +223,15 @@ func (l *DiskLocation) loadAllEcShards() (err error) { |
|
|
|
prevCollection = "" |
|
|
|
} |
|
|
|
|
|
|
|
datExists := util.FileExists(datFileName) |
|
|
|
// Determine .dat presence robustly; unexpected errors are treated as "exists"
|
|
|
|
datExists := false |
|
|
|
if _, err := os.Stat(datFileName); err == nil { |
|
|
|
datExists = true |
|
|
|
} else if !os.IsNotExist(err) { |
|
|
|
glog.Warningf("Failed to stat .dat file %s: %v", datFileName, err) |
|
|
|
// Safer to assume local .dat exists to avoid misclassifying as distributed EC
|
|
|
|
datExists = true |
|
|
|
} |
|
|
|
|
|
|
|
// Validate EC volume if .dat file exists (incomplete EC encoding scenario)
|
|
|
|
// This checks shard count, shard size consistency, and expected size vs .dat file
|
|
|
|
@ -232,8 +239,6 @@ func (l *DiskLocation) loadAllEcShards() (err error) { |
|
|
|
if datExists && !l.validateEcVolume(collection, volumeId) { |
|
|
|
glog.Warningf("Incomplete or invalid EC volume %d: .dat exists but validation failed, cleaning up EC files...", volumeId) |
|
|
|
l.removeEcVolumeFiles(collection, volumeId) |
|
|
|
// Clean up any in-memory state. This does not delete files (already deleted by removeEcVolumeFiles).
|
|
|
|
l.unloadEcVolume(volumeId) |
|
|
|
reset() |
|
|
|
continue |
|
|
|
} |
|
|
|
@ -243,12 +248,14 @@ func (l *DiskLocation) loadAllEcShards() (err error) { |
|
|
|
// If .dat is gone, log error but don't clean up (may be waiting for shards from other servers)
|
|
|
|
if datExists { |
|
|
|
glog.Warningf("Failed to load EC shards for volume %d and .dat exists: %v, cleaning up EC files to use .dat...", volumeId, err) |
|
|
|
// Unload first to release FDs, then remove files
|
|
|
|
l.unloadEcVolume(volumeId) |
|
|
|
l.removeEcVolumeFiles(collection, volumeId) |
|
|
|
} else { |
|
|
|
glog.Warningf("Failed to load EC shards for volume %d: %v (this may be normal for distributed EC volumes)", volumeId, err) |
|
|
|
// Clean up any partially loaded in-memory state. This does not delete files.
|
|
|
|
l.unloadEcVolume(volumeId) |
|
|
|
} |
|
|
|
// Clean up any partially loaded in-memory state. This does not delete files.
|
|
|
|
l.unloadEcVolume(volumeId) |
|
|
|
reset() |
|
|
|
continue |
|
|
|
} |
|
|
|
@ -311,17 +318,25 @@ func (l *DiskLocation) checkOrphanedShards(shards []string, collection string, v |
|
|
|
if len(shards) == 0 || volumeId == 0 { |
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Check if .dat file exists (incomplete encoding, not distributed EC)
|
|
|
|
// If .dat file exists, this is not a distributed EC volume, so cleanup the orphaned shards
|
|
|
|
// Use os.Stat for robust error handling; unexpected errors treated as "exists"
|
|
|
|
baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(volumeId)) |
|
|
|
datFileName := baseFileName + ".dat" |
|
|
|
if util.FileExists(datFileName) { |
|
|
|
|
|
|
|
datExists := false |
|
|
|
if _, err := os.Stat(datFileName); err == nil { |
|
|
|
datExists = true |
|
|
|
} else if !os.IsNotExist(err) { |
|
|
|
glog.Warningf("Failed to stat .dat file %s: %v", datFileName, err) |
|
|
|
// Safer to assume local .dat exists to avoid misclassifying as distributed EC
|
|
|
|
datExists = true |
|
|
|
} |
|
|
|
|
|
|
|
if datExists { |
|
|
|
glog.Warningf("Found %d EC shards without .ecx file for volume %d (incomplete encoding interrupted before .ecx creation), cleaning up...", |
|
|
|
len(shards), volumeId) |
|
|
|
l.removeEcVolumeFiles(collection, volumeId) |
|
|
|
// Clean up any in-memory state. This does not delete files (already deleted by removeEcVolumeFiles).
|
|
|
|
l.unloadEcVolume(volumeId) |
|
|
|
return true |
|
|
|
} |
|
|
|
return false |
|
|
|
|