Browse Source

handle incomplete ec encoding

pull/7384/head
chrislu 1 month ago
parent
commit
829dc8e092
  1. 29
      weed/storage/disk_location.go
  2. 114
      weed/storage/disk_location_ec.go

29
weed/storage/disk_location.go

@ -144,10 +144,26 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne
return false return false
} }
// skip if ec volumes exists
// parse out collection, volume id (moved up to use in EC validation)
vid, collection, err := volumeIdFromFileName(basename)
if err != nil {
glog.Warningf("get volume id failed, %s, err : %s", volumeName, err)
return false
}
// skip if ec volumes exists, but validate EC files first
if skipIfEcVolumesExists { if skipIfEcVolumesExists {
if util.FileExists(l.IdxDirectory + "/" + volumeName + ".ecx") {
return false
ecxFilePath := l.IdxDirectory + "/" + volumeName + ".ecx"
if util.FileExists(ecxFilePath) {
// Check if EC volume is valid by verifying shard count
if !l.validateEcVolume(collection, vid) {
glog.Warningf("EC volume %d validation failed, removing incomplete EC files to allow .dat file loading", vid)
l.removeEcVolumeFiles(collection, vid)
// Continue to load .dat file
} else {
// Valid EC volume exists, skip .dat file
return false
}
} }
} }
@ -161,13 +177,6 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne
return false return false
} }
// parse out collection, volume id
vid, collection, err := volumeIdFromFileName(basename)
if err != nil {
glog.Warningf("get volume id failed, %s, err : %s", volumeName, err)
return false
}
// avoid loading one volume more than once // avoid loading one volume more than once
l.volumesLock.RLock() l.volumesLock.RLock()
_, found := l.volumes[vid] _, found := l.volumes[vid]

114
weed/storage/disk_location_ec.go

@ -10,8 +10,10 @@ import (
"slices" "slices"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/util"
) )
var ( var (
@ -188,14 +190,65 @@ func (l *DiskLocation) loadAllEcShards() (err error) {
} }
if ext == ".ecx" && volumeId == prevVolumeId { if ext == ".ecx" && volumeId == prevVolumeId {
// Check if this is an incomplete EC encoding (not a distributed EC volume)
// Key distinction: if .dat file still exists, EC encoding may have failed
// If .dat file is gone, this is likely a distributed EC volume with shards on multiple servers
baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(volumeId))
datFileName := baseFileName + ".dat"
datExists := util.FileExists(datFileName)
// Only validate shard count if .dat file exists (incomplete EC encoding scenario)
// If .dat is gone, EC encoding completed and shards are distributed across servers
if datExists && len(sameVolumeShards) < erasure_coding.DataShardsCount {
glog.Warningf("Incomplete EC encoding for volume %d: .dat exists but only %d shards found (need at least %d), cleaning up EC files...",
volumeId, len(sameVolumeShards), erasure_coding.DataShardsCount)
l.removeEcVolumeFiles(collection, volumeId)
sameVolumeShards = nil
prevVolumeId = 0
continue
}
if err = l.loadEcShards(sameVolumeShards, collection, volumeId); err != nil { if err = l.loadEcShards(sameVolumeShards, collection, volumeId); err != nil {
return fmt.Errorf("loadEcShards collection:%v volumeId:%d : %v", collection, volumeId, err)
// If EC shards failed to load and .dat still exists, clean up EC files to allow .dat file to be used
// If .dat is gone, log error but don't clean up (may be waiting for shards from other servers)
if datExists {
glog.Warningf("Failed to load EC shards for volume %d and .dat exists: %v, cleaning up EC files to use .dat...", volumeId, err)
l.removeEcVolumeFiles(collection, volumeId)
} else {
glog.Warningf("Failed to load EC shards for volume %d: %v (this may be normal for distributed EC volumes)", volumeId, err)
}
sameVolumeShards = nil
prevVolumeId = 0
continue
} }
prevVolumeId = volumeId prevVolumeId = volumeId
sameVolumeShards = nil
continue continue
} }
} }
// Check for orphaned EC shards without .ecx file (incomplete EC encoding)
// This happens when encoding is interrupted after writing shards but before writing .ecx
if len(sameVolumeShards) > 0 && prevVolumeId != 0 {
// We have collected EC shards but never found .ecx file
// Need to determine the collection name from the shard filenames
if len(sameVolumeShards) > 0 {
baseName := sameVolumeShards[0][:len(sameVolumeShards[0])-len(path.Ext(sameVolumeShards[0]))]
collection, volumeId, err := parseCollectionVolumeId(baseName)
if err == nil && volumeId == prevVolumeId {
baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(volumeId))
datFileName := baseFileName + ".dat"
// Only clean up if .dat file exists (incomplete encoding, not distributed EC)
if util.FileExists(datFileName) {
glog.Warningf("Found %d EC shards without .ecx file for volume %d (incomplete encoding interrupted before .ecx creation), cleaning up...",
len(sameVolumeShards), volumeId)
l.removeEcVolumeFiles(collection, volumeId)
}
}
}
}
return nil return nil
} }
@ -237,3 +290,62 @@ func (l *DiskLocation) EcShardCount() int {
} }
return shardCount return shardCount
} }
// validateEcVolume checks if EC volume has enough shards to be functional
// For distributed EC volumes (where .dat is deleted), any number of shards is valid
// For incomplete EC encoding (where .dat still exists), we need at least DataShardsCount shards
func (l *DiskLocation) validateEcVolume(collection string, vid needle.VolumeId) bool {
baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(vid))
datFileName := baseFileName + ".dat"
datExists := util.FileExists(datFileName)
shardCount := 0
// Count existing EC shard files
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
shardFileName := baseFileName + erasure_coding.ToExt(i)
if util.FileExists(shardFileName) {
// Check if file has non-zero size
if fi, err := os.Stat(shardFileName); err == nil && fi.Size() > 0 {
shardCount++
}
}
}
// If .dat file is gone, this is a distributed EC volume - any shard count is valid
if !datExists {
glog.V(1).Infof("EC volume %d has %d shards (distributed EC, .dat removed)", vid, shardCount)
return true
}
// If .dat file exists, we need at least DataShardsCount shards locally
// Otherwise it's an incomplete EC encoding that should be cleaned up
if shardCount < erasure_coding.DataShardsCount {
glog.V(0).Infof("EC volume %d has .dat file but only %d shards (need at least %d for local EC)",
vid, shardCount, erasure_coding.DataShardsCount)
return false
}
return true
}
// removeEcVolumeFiles removes all EC-related files for a volume
func (l *DiskLocation) removeEcVolumeFiles(collection string, vid needle.VolumeId) {
baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(vid))
indexBaseFileName := erasure_coding.EcShardFileName(collection, l.IdxDirectory, int(vid))
// Remove all EC shard files (.ec00 ~ .ec13)
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
shardFileName := baseFileName + erasure_coding.ToExt(i)
if err := os.Remove(shardFileName); err == nil {
glog.V(0).Infof("Removed incomplete EC shard file: %s", shardFileName)
}
}
// Remove index files
if err := os.Remove(indexBaseFileName + ".ecx"); err == nil {
glog.V(0).Infof("Removed incomplete EC index file: %s.ecx", indexBaseFileName)
}
if err := os.Remove(indexBaseFileName + ".ecj"); err == nil {
glog.V(0).Infof("Removed incomplete EC journal file: %s.ecj", indexBaseFileName)
}
}
Loading…
Cancel
Save