Browse Source

fix: replace TotalShardsCount with MaxShardCount in critical data structures

Critical fixes for buffer allocations and loops that must support
custom EC ratios up to 32 shards:

Data Structures:
- store_ec.go:354: Buffer allocation for shard recovery (bufs array)
- topology_ec.go:14: EcShardLocations.Locations fixed array size
- command_ec_rebuild.go:268: EC shard map allocation
- command_ec_common.go:626: Shard-to-locations map allocation

Shard Discovery Loops:
- ec_task.go:378: Loop to find generated shard files
- ec_shard_management.go: All 8 loops that check/count EC shards

These changes are critical because:
1. Buffer allocations sized to 14 would cause index-out-of-bounds panics
   when accessing shards 14-31
2. Fixed arrays sized to 14 would truncate shard location data
3. Loops limited to 0-13 would never discover/manage shards 14-31

Note: command_ec_encode.go:208 intentionally NOT changed - it creates
shard IDs to mount after encoding. In Phase 1 we always generate 14
shards, so this remains TotalShardsCount and will be made dynamic in
Phase 2 based on actual EC context.

Without these fixes, custom EC ratios > 14 total shards would cause:
- Runtime panics (array index out of bounds)
- Data loss (shards 14-31 never discovered/tracked)
- Incomplete shard management (missing shards not detected)
pull/7396/head
chrislu 1 month ago
parent
commit
1338df5e23
  1. 20
      weed/admin/dash/ec_shard_management.go
  2. 3
      weed/shell/command_ec_common.go
  3. 13
      weed/shell/command_ec_rebuild.go
  4. 3
      weed/storage/store_ec.go
  5. 3
      weed/topology/topology_ec.go
  6. 3
      weed/worker/tasks/erasure_coding/ec_task.go

20
weed/admin/dash/ec_shard_management.go

@ -68,7 +68,7 @@ func (s *AdminServer) GetClusterEcShards(page int, pageSize int, sortBy string,
// Create individual shard entries for each shard this server has
shardBits := ecShardInfo.EcIndexBits
for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
for shardId := 0; shardId < erasure_coding.MaxShardCount; shardId++ {
if (shardBits & (1 << uint(shardId))) != 0 {
// Mark this shard as present for this volume
volumeShardsMap[volumeId][shardId] = true
@ -112,13 +112,13 @@ func (s *AdminServer) GetClusterEcShards(page int, pageSize int, sortBy string,
shardCount := len(shardsPresent)
// Find which shards are missing for this volume across ALL servers
for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
for shardId := 0; shardId < erasure_coding.MaxShardCount; shardId++ {
if !shardsPresent[shardId] {
missingShards = append(missingShards, shardId)
}
}
isComplete := (shardCount == erasure_coding.TotalShardsCount)
isComplete := (shardCount == erasure_coding.MaxShardCount)
volumeCompleteness[volumeId] = isComplete
volumeMissingShards[volumeId] = missingShards
@ -332,7 +332,7 @@ func (s *AdminServer) GetClusterEcVolumes(page int, pageSize int, sortBy string,
// Process each shard this server has for this volume
shardBits := ecShardInfo.EcIndexBits
for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
for shardId := 0; shardId < erasure_coding.MaxShardCount; shardId++ {
if (shardBits & (1 << uint(shardId))) != 0 {
// Record shard location
volume.ShardLocations[shardId] = node.Id
@ -394,7 +394,7 @@ func (s *AdminServer) GetClusterEcVolumes(page int, pageSize int, sortBy string,
// Find missing shards
var missingShards []int
for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
for shardId := 0; shardId < erasure_coding.MaxShardCount; shardId++ {
if _, exists := volume.ShardLocations[shardId]; !exists {
missingShards = append(missingShards, shardId)
}
@ -523,7 +523,7 @@ func sortEcVolumes(volumes []EcVolumeWithShards, sortBy string, sortOrder string
// getShardCount returns the number of shards represented by the bitmap
func getShardCount(ecIndexBits uint32) int {
count := 0
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
for i := 0; i < erasure_coding.MaxShardCount; i++ {
if (ecIndexBits & (1 << uint(i))) != 0 {
count++
}
@ -534,7 +534,7 @@ func getShardCount(ecIndexBits uint32) int {
// getMissingShards returns a slice of missing shard IDs for a volume
func getMissingShards(ecIndexBits uint32) []int {
var missing []int
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
for i := 0; i < erasure_coding.MaxShardCount; i++ {
if (ecIndexBits & (1 << uint(i))) == 0 {
missing = append(missing, i)
}
@ -614,7 +614,7 @@ func (s *AdminServer) GetEcVolumeDetails(volumeID uint32, sortBy string, sortOrd
// Create individual shard entries for each shard this server has
shardBits := ecShardInfo.EcIndexBits
for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
for shardId := 0; shardId < erasure_coding.MaxShardCount; shardId++ {
if (shardBits & (1 << uint(shardId))) != 0 {
ecShard := EcShardWithInfo{
VolumeID: ecShardInfo.Id,
@ -698,11 +698,11 @@ func (s *AdminServer) GetEcVolumeDetails(volumeID uint32, sortBy string, sortOrd
}
totalUniqueShards := len(foundShards)
isComplete := (totalUniqueShards == erasure_coding.TotalShardsCount)
isComplete := (totalUniqueShards == erasure_coding.MaxShardCount)
// Calculate missing shards
var missingShards []int
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
for i := 0; i < erasure_coding.MaxShardCount; i++ {
if !foundShards[i] {
missingShards = append(missingShards, i)
}

3
weed/shell/command_ec_common.go

@ -622,7 +622,8 @@ func (ecb *ecBalancer) deleteDuplicatedEcShards(collection string) error {
func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.VolumeId, locations []*EcNode) error {
// check whether this volume has ecNodes that are over average
shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount)
// Use MaxShardCount (32) to support custom EC ratios
shardToLocations := make([][]*EcNode, erasure_coding.MaxShardCount)
for _, ecNode := range locations {
shardBits := findEcVolumeShards(ecNode, vid)
for _, shardId := range shardBits.ShardIds() {

13
weed/shell/command_ec_rebuild.go

@ -261,12 +261,13 @@ type EcShardLocations [][]*EcNode
func (ecShardMap EcShardMap) registerEcNode(ecNode *EcNode, collection string) {
for _, diskInfo := range ecNode.info.DiskInfos {
for _, shardInfo := range diskInfo.EcShardInfos {
if shardInfo.Collection == collection {
existing, found := ecShardMap[needle.VolumeId(shardInfo.Id)]
if !found {
existing = make([][]*EcNode, erasure_coding.TotalShardsCount)
ecShardMap[needle.VolumeId(shardInfo.Id)] = existing
}
if shardInfo.Collection == collection {
existing, found := ecShardMap[needle.VolumeId(shardInfo.Id)]
if !found {
// Use MaxShardCount (32) to support custom EC ratios
existing = make([][]*EcNode, erasure_coding.MaxShardCount)
ecShardMap[needle.VolumeId(shardInfo.Id)] = existing
}
for _, shardId := range erasure_coding.ShardBits(shardInfo.EcIndexBits).ShardIds() {
existing[shardId] = append(existing[shardId], ecNode)
}

3
weed/storage/store_ec.go

@ -350,7 +350,8 @@ func (s *Store) recoverOneRemoteEcShardInterval(needleId types.NeedleId, ecVolum
return 0, false, fmt.Errorf("failed to create encoder: %w", err)
}
bufs := make([][]byte, erasure_coding.TotalShardsCount)
// Use MaxShardCount to support custom EC ratios up to 32 shards
bufs := make([][]byte, erasure_coding.MaxShardCount)
var wg sync.WaitGroup
ecVolume.ShardLocationsLock.RLock()

3
weed/topology/topology_ec.go

@ -10,7 +10,8 @@ import (
type EcShardLocations struct {
Collection string
Locations [erasure_coding.TotalShardsCount][]*DataNode
// Use MaxShardCount (32) to support custom EC ratios
Locations [erasure_coding.MaxShardCount][]*DataNode
}
func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {

3
weed/worker/tasks/erasure_coding/ec_task.go

@ -374,7 +374,8 @@ func (t *ErasureCodingTask) generateEcShardsLocally(localFiles map[string]string
var generatedShards []string
var totalShardSize int64
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
// Check up to MaxShardCount (32) to support custom EC ratios
for i := 0; i < erasure_coding.MaxShardCount; i++ {
shardFile := fmt.Sprintf("%s.ec%02d", baseName, i)
if info, err := os.Stat(shardFile); err == nil {
shardKey := fmt.Sprintf("ec%02d", i)

Loading…
Cancel
Save