diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index aac824318..3da922f5b 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -144,10 +144,26 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne return false } - // skip if ec volumes exists + // parse out collection, volume id (moved up to use in EC validation) + vid, collection, err := volumeIdFromFileName(basename) + if err != nil { + glog.Warningf("get volume id failed, %s, err : %s", volumeName, err) + return false + } + + // skip if ec volumes exists, but validate EC files first if skipIfEcVolumesExists { - if util.FileExists(l.IdxDirectory + "/" + volumeName + ".ecx") { - return false + ecxFilePath := l.IdxDirectory + "/" + volumeName + ".ecx" + if util.FileExists(ecxFilePath) { + // Check if EC volume is valid by verifying shard count + if !l.validateEcVolume(collection, vid) { + glog.Warningf("EC volume %d validation failed, removing incomplete EC files to allow .dat file loading", vid) + l.removeEcVolumeFiles(collection, vid) + // Continue to load .dat file + } else { + // Valid EC volume exists, skip .dat file + return false + } } } @@ -161,13 +177,6 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne return false } - // parse out collection, volume id - vid, collection, err := volumeIdFromFileName(basename) - if err != nil { - glog.Warningf("get volume id failed, %s, err : %s", volumeName, err) - return false - } - // avoid loading one volume more than once l.volumesLock.RLock() _, found := l.volumes[vid] diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index 0db73adc6..e710c87d4 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -10,8 +10,10 @@ import ( "slices" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/util" ) var ( @@ -188,14 +190,65 @@ func (l *DiskLocation) loadAllEcShards() (err error) { } if ext == ".ecx" && volumeId == prevVolumeId { + // Check if this is an incomplete EC encoding (not a distributed EC volume) + // Key distinction: if .dat file still exists, EC encoding may have failed + // If .dat file is gone, this is likely a distributed EC volume with shards on multiple servers + baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(volumeId)) + datFileName := baseFileName + ".dat" + datExists := util.FileExists(datFileName) + + // Only validate shard count if .dat file exists (incomplete EC encoding scenario) + // If .dat is gone, EC encoding completed and shards are distributed across servers + if datExists && len(sameVolumeShards) < erasure_coding.DataShardsCount { + glog.Warningf("Incomplete EC encoding for volume %d: .dat exists but only %d shards found (need at least %d), cleaning up EC files...", + volumeId, len(sameVolumeShards), erasure_coding.DataShardsCount) + l.removeEcVolumeFiles(collection, volumeId) + sameVolumeShards = nil + prevVolumeId = 0 + continue + } + if err = l.loadEcShards(sameVolumeShards, collection, volumeId); err != nil { - return fmt.Errorf("loadEcShards collection:%v volumeId:%d : %v", collection, volumeId, err) + // If EC shards failed to load and .dat still exists, clean up EC files to allow .dat file to be used + // If .dat is gone, log error but don't clean up (may be waiting for shards from other servers) + if datExists { + glog.Warningf("Failed to load EC shards for volume %d and .dat exists: %v, cleaning up EC files to use .dat...", volumeId, err) + l.removeEcVolumeFiles(collection, volumeId) + } else { + glog.Warningf("Failed to load EC shards for volume %d: %v (this may be normal for distributed EC volumes)", volumeId, err) + } + sameVolumeShards = nil + prevVolumeId = 0 + continue } prevVolumeId = volumeId + sameVolumeShards = nil continue } } + + // Check for orphaned EC shards without .ecx file (incomplete EC encoding) + // This happens when encoding is interrupted after writing shards but before writing .ecx + if len(sameVolumeShards) > 0 && prevVolumeId != 0 { + // We have collected EC shards but never found .ecx file + // Need to determine the collection name from the shard filenames + if len(sameVolumeShards) > 0 { + baseName := sameVolumeShards[0][:len(sameVolumeShards[0])-len(path.Ext(sameVolumeShards[0]))] + collection, volumeId, err := parseCollectionVolumeId(baseName) + if err == nil && volumeId == prevVolumeId { + baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(volumeId)) + datFileName := baseFileName + ".dat" + // Only clean up if .dat file exists (incomplete encoding, not distributed EC) + if util.FileExists(datFileName) { + glog.Warningf("Found %d EC shards without .ecx file for volume %d (incomplete encoding interrupted before .ecx creation), cleaning up...", + len(sameVolumeShards), volumeId) + l.removeEcVolumeFiles(collection, volumeId) + } + } + } + } + return nil } @@ -237,3 +290,62 @@ func (l *DiskLocation) EcShardCount() int { } return shardCount } + +// validateEcVolume checks if EC volume has enough shards to be functional +// For distributed EC volumes (where .dat is deleted), any number of shards is valid +// For incomplete EC encoding (where .dat still exists), we need at least DataShardsCount shards +func (l *DiskLocation) validateEcVolume(collection string, vid needle.VolumeId) bool { + baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(vid)) + datFileName := baseFileName + ".dat" + datExists := util.FileExists(datFileName) + shardCount := 0 + + // Count existing EC shard files + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + shardFileName := baseFileName + erasure_coding.ToExt(i) + if util.FileExists(shardFileName) { + // Check if file has non-zero size + if fi, err := os.Stat(shardFileName); err == nil && fi.Size() > 0 { + shardCount++ + } + } + } + + // If .dat file is gone, this is a distributed EC volume - any shard count is valid + if !datExists { + glog.V(1).Infof("EC volume %d has %d shards (distributed EC, .dat removed)", vid, shardCount) + return true + } + + // If .dat file exists, we need at least DataShardsCount shards locally + // Otherwise it's an incomplete EC encoding that should be cleaned up + if shardCount < erasure_coding.DataShardsCount { + glog.V(0).Infof("EC volume %d has .dat file but only %d shards (need at least %d for local EC)", + vid, shardCount, erasure_coding.DataShardsCount) + return false + } + + return true +} + +// removeEcVolumeFiles removes all EC-related files for a volume +func (l *DiskLocation) removeEcVolumeFiles(collection string, vid needle.VolumeId) { + baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(vid)) + indexBaseFileName := erasure_coding.EcShardFileName(collection, l.IdxDirectory, int(vid)) + + // Remove all EC shard files (.ec00 ~ .ec13) + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + shardFileName := baseFileName + erasure_coding.ToExt(i) + if err := os.Remove(shardFileName); err == nil { + glog.V(0).Infof("Removed incomplete EC shard file: %s", shardFileName) + } + } + + // Remove index files + if err := os.Remove(indexBaseFileName + ".ecx"); err == nil { + glog.V(0).Infof("Removed incomplete EC index file: %s.ecx", indexBaseFileName) + } + if err := os.Remove(indexBaseFileName + ".ecj"); err == nil { + glog.V(0).Infof("Removed incomplete EC journal file: %s.ecj", indexBaseFileName) + } +}