|
|
@ -42,6 +42,18 @@ func (l *DiskLocation) DestroyEcVolume(vid needle.VolumeId) { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// unloadEcVolume removes an EC volume from memory without deleting its files on disk.
|
|
|
|
|
|
// This is useful for distributed EC volumes where shards may be on other servers.
|
|
|
|
|
|
func (l *DiskLocation) unloadEcVolume(vid needle.VolumeId) { |
|
|
|
|
|
l.ecVolumesLock.Lock() |
|
|
|
|
|
defer l.ecVolumesLock.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
if ecVolume, found := l.ecVolumes[vid]; found { |
|
|
|
|
|
ecVolume.Close() |
|
|
|
|
|
delete(l.ecVolumes, vid) |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
func (l *DiskLocation) CollectEcShards(vid needle.VolumeId, shardFileNames []string) (ecVolume *erasure_coding.EcVolume, found bool) { |
|
|
func (l *DiskLocation) CollectEcShards(vid needle.VolumeId, shardFileNames []string) (ecVolume *erasure_coding.EcVolume, found bool) { |
|
|
l.ecVolumesLock.RLock() |
|
|
l.ecVolumesLock.RLock() |
|
|
defer l.ecVolumesLock.RUnlock() |
|
|
defer l.ecVolumesLock.RUnlock() |
|
|
@ -202,9 +214,9 @@ func (l *DiskLocation) loadAllEcShards() (err error) { |
|
|
if datExists && len(sameVolumeShards) < erasure_coding.DataShardsCount { |
|
|
if datExists && len(sameVolumeShards) < erasure_coding.DataShardsCount { |
|
|
glog.Warningf("Incomplete EC encoding for volume %d: .dat exists but only %d shards found (need at least %d), cleaning up EC files...", |
|
|
glog.Warningf("Incomplete EC encoding for volume %d: .dat exists but only %d shards found (need at least %d), cleaning up EC files...", |
|
|
volumeId, len(sameVolumeShards), erasure_coding.DataShardsCount) |
|
|
volumeId, len(sameVolumeShards), erasure_coding.DataShardsCount) |
|
|
// Clean up any in-memory state before removing files
|
|
|
|
|
|
l.DestroyEcVolume(volumeId) |
|
|
|
|
|
l.removeEcVolumeFiles(collection, volumeId) |
|
|
l.removeEcVolumeFiles(collection, volumeId) |
|
|
|
|
|
// Clean up any in-memory state. This does not delete files (already deleted by removeEcVolumeFiles).
|
|
|
|
|
|
l.unloadEcVolume(volumeId) |
|
|
sameVolumeShards = nil |
|
|
sameVolumeShards = nil |
|
|
prevVolumeId = 0 |
|
|
prevVolumeId = 0 |
|
|
continue |
|
|
continue |
|
|
@ -215,14 +227,12 @@ func (l *DiskLocation) loadAllEcShards() (err error) { |
|
|
// If .dat is gone, log error but don't clean up (may be waiting for shards from other servers)
|
|
|
// If .dat is gone, log error but don't clean up (may be waiting for shards from other servers)
|
|
|
if datExists { |
|
|
if datExists { |
|
|
glog.Warningf("Failed to load EC shards for volume %d and .dat exists: %v, cleaning up EC files to use .dat...", volumeId, err) |
|
|
glog.Warningf("Failed to load EC shards for volume %d and .dat exists: %v, cleaning up EC files to use .dat...", volumeId, err) |
|
|
// Clean up any partially loaded in-memory state before removing files
|
|
|
|
|
|
l.DestroyEcVolume(volumeId) |
|
|
|
|
|
l.removeEcVolumeFiles(collection, volumeId) |
|
|
l.removeEcVolumeFiles(collection, volumeId) |
|
|
} else { |
|
|
} else { |
|
|
glog.Warningf("Failed to load EC shards for volume %d: %v (this may be normal for distributed EC volumes)", volumeId, err) |
|
|
glog.Warningf("Failed to load EC shards for volume %d: %v (this may be normal for distributed EC volumes)", volumeId, err) |
|
|
// Clean up any partially loaded in-memory state even if we don't remove files
|
|
|
|
|
|
l.DestroyEcVolume(volumeId) |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
// Clean up any partially loaded in-memory state. This does not delete files.
|
|
|
|
|
|
l.unloadEcVolume(volumeId) |
|
|
sameVolumeShards = nil |
|
|
sameVolumeShards = nil |
|
|
prevVolumeId = 0 |
|
|
prevVolumeId = 0 |
|
|
continue |
|
|
continue |
|
|
@ -248,9 +258,9 @@ func (l *DiskLocation) loadAllEcShards() (err error) { |
|
|
if util.FileExists(datFileName) { |
|
|
if util.FileExists(datFileName) { |
|
|
glog.Warningf("Found %d EC shards without .ecx file for volume %d (incomplete encoding interrupted before .ecx creation), cleaning up...", |
|
|
glog.Warningf("Found %d EC shards without .ecx file for volume %d (incomplete encoding interrupted before .ecx creation), cleaning up...", |
|
|
len(sameVolumeShards), volumeId) |
|
|
len(sameVolumeShards), volumeId) |
|
|
// Clean up any in-memory state before removing files
|
|
|
|
|
|
l.DestroyEcVolume(volumeId) |
|
|
|
|
|
l.removeEcVolumeFiles(collection, volumeId) |
|
|
l.removeEcVolumeFiles(collection, volumeId) |
|
|
|
|
|
// Clean up any in-memory state. This does not delete files (already deleted by removeEcVolumeFiles).
|
|
|
|
|
|
l.unloadEcVolume(volumeId) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
@ -300,6 +310,7 @@ func (l *DiskLocation) EcShardCount() int { |
|
|
// validateEcVolume checks if EC volume has enough shards to be functional
|
|
|
// validateEcVolume checks if EC volume has enough shards to be functional
|
|
|
// For distributed EC volumes (where .dat is deleted), any number of shards is valid
|
|
|
// For distributed EC volumes (where .dat is deleted), any number of shards is valid
|
|
|
// For incomplete EC encoding (where .dat still exists), we need at least DataShardsCount shards
|
|
|
// For incomplete EC encoding (where .dat still exists), we need at least DataShardsCount shards
|
|
|
|
|
|
// Also validates that all shards have the same size (required for Reed-Solomon EC)
|
|
|
func (l *DiskLocation) validateEcVolume(collection string, vid needle.VolumeId) bool { |
|
|
func (l *DiskLocation) validateEcVolume(collection string, vid needle.VolumeId) bool { |
|
|
baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(vid)) |
|
|
baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(vid)) |
|
|
datFileName := baseFileName + ".dat" |
|
|
datFileName := baseFileName + ".dat" |
|
|
@ -312,13 +323,23 @@ func (l *DiskLocation) validateEcVolume(collection string, vid needle.VolumeId) |
|
|
return true |
|
|
return true |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// .dat file exists, so we need to validate shard count for local EC
|
|
|
|
|
|
|
|
|
// .dat file exists, so we need to validate shard count and size for local EC
|
|
|
shardCount := 0 |
|
|
shardCount := 0 |
|
|
|
|
|
var expectedShardSize int64 = -1 |
|
|
|
|
|
|
|
|
for i := 0; i < erasure_coding.TotalShardsCount; i++ { |
|
|
for i := 0; i < erasure_coding.TotalShardsCount; i++ { |
|
|
shardFileName := baseFileName + erasure_coding.ToExt(i) |
|
|
shardFileName := baseFileName + erasure_coding.ToExt(i) |
|
|
if fi, err := os.Stat(shardFileName); err == nil { |
|
|
if fi, err := os.Stat(shardFileName); err == nil { |
|
|
// Check if file has non-zero size
|
|
|
// Check if file has non-zero size
|
|
|
if fi.Size() > 0 { |
|
|
if fi.Size() > 0 { |
|
|
|
|
|
// Validate all shards are the same size (required for Reed-Solomon EC)
|
|
|
|
|
|
if expectedShardSize == -1 { |
|
|
|
|
|
expectedShardSize = fi.Size() |
|
|
|
|
|
} else if fi.Size() != expectedShardSize { |
|
|
|
|
|
glog.V(0).Infof("EC volume %d shard %d has size %d, expected %d (all EC shards must be same size)", |
|
|
|
|
|
vid, i, fi.Size(), expectedShardSize) |
|
|
|
|
|
return false |
|
|
|
|
|
} |
|
|
shardCount++ |
|
|
shardCount++ |
|
|
} |
|
|
} |
|
|
} else if !os.IsNotExist(err) { |
|
|
} else if !os.IsNotExist(err) { |
|
|
|