diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go index 27ce21865..120e3dbc9 100644 --- a/weed/command/filer_backup.go +++ b/weed/command/filer_backup.go @@ -83,7 +83,6 @@ func runFilerBackup(cmd *Command, args []string) bool { time.Sleep(1747 * time.Millisecond) } } - return false } const ( diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index 197e69e73..3454ea72c 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -126,7 +126,6 @@ func runFilerMetaBackup(cmd *Command, args []string) bool { time.Sleep(1747 * time.Millisecond) } } - return false } func (metaBackup *FilerMetaBackupOptions) initStore(v *viper.Viper) error { diff --git a/weed/shell/command_cluster_status.go b/weed/shell/command_cluster_status.go index 1070c257b..7a31c34e5 100644 --- a/weed/shell/command_cluster_status.go +++ b/weed/shell/command_cluster_status.go @@ -310,7 +310,7 @@ func (sp *ClusterStatusPrinter) printVolumeInfo() { for _, eci := range di.EcShardInfos { vid := needle.VolumeId(eci.Id) ecVolumeIds[vid] = true - ecShards += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(eci) + ecShards += erasure_coding.GetShardCount(eci) } } } diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 82d661289..1f6f0e005 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -415,7 +415,7 @@ func swap(data []*CandidateEcNode, i, j int) { func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) { for _, eci := range ecShardInfos { - count += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(eci) + count += erasure_coding.GetShardCount(eci) } return } @@ -441,7 +441,7 @@ func (ecNode *EcNode) localShardIdCount(vid uint32) int { for _, diskInfo := range ecNode.info.DiskInfos { for _, eci := range diskInfo.EcShardInfos { if vid == eci.Id { - return erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(eci) + return erasure_coding.GetShardCount(eci) } } } @@ -602,7 +602,7 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(ecsi) oldShardCount := si.Count() for _, shardId := range shardIds { - si.Set(shardId, 0) + si.Set(erasure_coding.NewShardInfo(shardId, 0)) } ecsi.EcIndexBits = si.Bitmap() ecsi.ShardSizes = si.SizesInt64() @@ -621,7 +621,7 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, if !foundVolume { si := erasure_coding.NewShardsInfo() for _, id := range shardIds { - si.Set(id, 0) + si.Set(erasure_coding.NewShardInfo(id, 0)) } diskInfo.EcShardInfos = append(diskInfo.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{ Id: uint32(vid), @@ -1107,7 +1107,7 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { return } for _, ecShardInfo := range diskInfo.EcShardInfos { - count += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(ecShardInfo) + count += erasure_coding.GetShardCount(ecShardInfo) } return ecNode.info.Id, count }) diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index dc1b6e6cc..71e05893e 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -149,7 +149,7 @@ func (erb *ecRebuilder) countLocalShards(node *EcNode, collection string, volume for _, diskInfo := range node.info.DiskInfos { for _, ecShardInfo := range diskInfo.EcShardInfos { if ecShardInfo.Collection == collection && needle.VolumeId(ecShardInfo.Id) == volumeId { - return erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(ecShardInfo) + return erasure_coding.GetShardCount(ecShardInfo) } } } diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index f86d5ff94..1a2abf1d0 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -255,7 +255,7 @@ func capacityByMaxVolumeCount(diskType types.DiskType) CapacityFunc { } var ecShardCount int for _, ecShardInfo := range diskInfo.EcShardInfos { - ecShardCount += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(ecShardInfo) + ecShardCount += erasure_coding.GetShardCount(ecShardInfo) } return float64(diskInfo.MaxVolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount } @@ -269,7 +269,7 @@ func capacityByFreeVolumeCount(diskType types.DiskType) CapacityFunc { } var ecShardCount int for _, ecShardInfo := range diskInfo.EcShardInfos { - ecShardCount += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(ecShardInfo) + ecShardCount += erasure_coding.GetShardCount(ecShardInfo) } return float64(diskInfo.MaxVolumeCount-diskInfo.VolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount } diff --git a/weed/shell/command_volume_list_test.go b/weed/shell/command_volume_list_test.go index a0d272d80..aa7d82082 100644 --- a/weed/shell/command_volume_list_test.go +++ b/weed/shell/command_volume_list_test.go @@ -115,7 +115,7 @@ func parseOutput(output string) *master_pb.TopologyInfo { shardsInfo := erasure_coding.NewShardsInfo() for _, shardId := range strings.Split(shards, ",") { sid, _ := strconv.Atoi(shardId) - shardsInfo.Set(erasure_coding.ShardId(sid), 0) + shardsInfo.Set(erasure_coding.NewShardInfo(erasure_coding.ShardId(sid), 0)) } ecShard.EcIndexBits = shardsInfo.Bitmap() ecShard.ShardSizes = shardsInfo.SizesInt64() diff --git a/weed/shell/ec_rebalance_slots_test.go b/weed/shell/ec_rebalance_slots_test.go index 093df1ae3..f47ddbb13 100644 --- a/weed/shell/ec_rebalance_slots_test.go +++ b/weed/shell/ec_rebalance_slots_test.go @@ -44,7 +44,7 @@ func TestECRebalanceWithLimitedSlots(t *testing.T) { shardCount := 0 for _, diskInfo := range node.info.DiskInfos { for _, ecShard := range diskInfo.EcShardInfos { - shardCount += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(ecShard) + shardCount += erasure_coding.GetShardCount(ecShard) } } t.Logf(" Node %s (rack %s): %d shards, %d free slots", @@ -56,7 +56,7 @@ func TestECRebalanceWithLimitedSlots(t *testing.T) { for _, node := range ecNodes { for _, diskInfo := range node.info.DiskInfos { for _, ecShard := range diskInfo.EcShardInfos { - totalEcShards += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(ecShard) + totalEcShards += erasure_coding.GetShardCount(ecShard) } } } @@ -122,7 +122,7 @@ func TestECRebalanceZeroFreeSlots(t *testing.T) { shardCount := 0 for _, diskInfo := range node.info.DiskInfos { for _, ecShard := range diskInfo.EcShardInfos { - shardCount += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(ecShard) + shardCount += erasure_coding.GetShardCount(ecShard) } } t.Logf(" Node %s: %d shards, %d free slots, volumeCount=%d, max=%d", @@ -228,7 +228,7 @@ func buildEcShards(volumeIds []uint32) []*master_pb.VolumeEcShardInformationMess for _, vid := range volumeIds { si := erasure_coding.NewShardsInfo() for _, id := range erasure_coding.AllShardIds() { - si.Set(id, 1234) + si.Set(erasure_coding.NewShardInfo(id, 1234)) } shards = append(shards, &master_pb.VolumeEcShardInformationMessage{ Id: vid, diff --git a/weed/storage/erasure_coding/ec_shard_info.go b/weed/storage/erasure_coding/ec_shard_info.go new file mode 100644 index 000000000..fafb62a1a --- /dev/null +++ b/weed/storage/erasure_coding/ec_shard_info.go @@ -0,0 +1,23 @@ +package erasure_coding + +// ShardSize represents the size of a shard in bytes +type ShardSize int64 + +// ShardInfo holds information about a single shard +type ShardInfo struct { + Id ShardId + Size ShardSize +} + +// NewShardInfo creates a new ShardInfo with the given ID and size +func NewShardInfo(id ShardId, size ShardSize) ShardInfo { + return ShardInfo{ + Id: id, + Size: size, + } +} + +// IsValid checks if the shard info has a valid ID +func (si ShardInfo) IsValid() bool { + return si.Id < MaxShardCount +} diff --git a/weed/storage/erasure_coding/ec_shards_info.go b/weed/storage/erasure_coding/ec_shards_info.go new file mode 100644 index 000000000..55838eb4e --- /dev/null +++ b/weed/storage/erasure_coding/ec_shards_info.go @@ -0,0 +1,358 @@ +package erasure_coding + +import ( + "fmt" + "math/bits" + "sort" + "strings" + "sync" + + "github.com/dustin/go-humanize" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" +) + +// ShardBits is a bitmap representing which shards are present (bit 0 = shard 0, etc.) +type ShardBits uint32 + +// Has checks if a shard ID is present in the bitmap +func (sb ShardBits) Has(id ShardId) bool { + return id < MaxShardCount && sb&(1<= MaxShardCount { + return sb + } + return sb | (1 << id) +} + +// Clear clears a shard ID from the bitmap +func (sb ShardBits) Clear(id ShardId) ShardBits { + if id >= MaxShardCount { + return sb + } + return sb &^ (1 << id) +} + +// Count returns the number of set bits using popcount +func (sb ShardBits) Count() int { + return bits.OnesCount32(uint32(sb)) +} + +// ShardsInfo encapsulates information for EC shards with memory-efficient storage +type ShardsInfo struct { + mu sync.RWMutex + shards []ShardInfo // Sorted by Id + shardBits ShardBits +} + +func NewShardsInfo() *ShardsInfo { + return &ShardsInfo{ + shards: make([]ShardInfo, 0, TotalShardsCount), + } +} + +// Initializes a ShardsInfo from a ECVolume. +func ShardsInfoFromVolume(ev *EcVolume) *ShardsInfo { + res := &ShardsInfo{ + shards: make([]ShardInfo, len(ev.Shards)), + } + // Build shards directly to avoid locking in Set() since res is not yet shared + for i, s := range ev.Shards { + res.shards[i] = NewShardInfo(s.ShardId, ShardSize(s.Size())) + res.shardBits = res.shardBits.Set(s.ShardId) + } + return res +} + +// Initializes a ShardsInfo from a VolumeEcShardInformationMessage proto. +func ShardsInfoFromVolumeEcShardInformationMessage(vi *master_pb.VolumeEcShardInformationMessage) *ShardsInfo { + res := NewShardsInfo() + if vi == nil { + return res + } + + var id ShardId + var j int + // Build shards directly to avoid locking in Set() since res is not yet shared + newShards := make([]ShardInfo, 0, 8) + for bitmap := vi.EcIndexBits; bitmap != 0; bitmap >>= 1 { + if bitmap&1 != 0 { + var size ShardSize + if j < len(vi.ShardSizes) { + size = ShardSize(vi.ShardSizes[j]) + } + j++ + newShards = append(newShards, NewShardInfo(id, size)) + } + id++ + } + res.shards = newShards + res.shardBits = ShardBits(vi.EcIndexBits) + + return res +} + +// Returns a count of shards from a VolumeEcShardInformationMessage proto. +func GetShardCount(vi *master_pb.VolumeEcShardInformationMessage) int { + if vi == nil { + return 0 + } + return ShardBits(vi.EcIndexBits).Count() +} + +// Returns a string representation for a ShardsInfo. +func (sp *ShardsInfo) String() string { + sp.mu.RLock() + defer sp.mu.RUnlock() + var sb strings.Builder + for i, s := range sp.shards { + if i > 0 { + sb.WriteString(" ") + } + fmt.Fprintf(&sb, "%d:%s", s.Id, humanize.Bytes(uint64(s.Size))) + } + return sb.String() +} + +// AsSlice converts a ShardsInfo to a slice of ShardInfo structs, ordered by shard ID. +func (si *ShardsInfo) AsSlice() []ShardInfo { + si.mu.RLock() + defer si.mu.RUnlock() + res := make([]ShardInfo, len(si.shards)) + copy(res, si.shards) + return res +} + +// Count returns the number of EC shards using popcount on the bitmap. +func (si *ShardsInfo) Count() int { + si.mu.RLock() + defer si.mu.RUnlock() + return si.shardBits.Count() +} + +// Has verifies if a shard ID is present using bitmap check. +func (si *ShardsInfo) Has(id ShardId) bool { + si.mu.RLock() + defer si.mu.RUnlock() + return si.shardBits.Has(id) +} + +// Ids returns a list of shard IDs, in ascending order. +func (si *ShardsInfo) Ids() []ShardId { + si.mu.RLock() + defer si.mu.RUnlock() + ids := make([]ShardId, len(si.shards)) + for i, s := range si.shards { + ids[i] = s.Id + } + return ids +} + +// IdsInt returns a list of shards ID as int, in ascending order. +func (si *ShardsInfo) IdsInt() []int { + ids := si.Ids() + res := make([]int, len(ids)) + for i, id := range ids { + res[i] = int(id) + } + return res +} + +// IdsUint32 returns a list of shards ID as uint32, in ascending order. +func (si *ShardsInfo) IdsUint32() []uint32 { + return ShardIdsToUint32(si.Ids()) +} + +// Set sets or updates a shard's information. +func (si *ShardsInfo) Set(shard ShardInfo) { + if shard.Id >= MaxShardCount { + return + } + si.mu.Lock() + defer si.mu.Unlock() + + // Check if already exists + if si.shardBits.Has(shard.Id) { + // Find and update + idx := si.findIndex(shard.Id) + if idx >= 0 { + si.shards[idx] = shard + } + return + } + + // Add new shard + si.shardBits = si.shardBits.Set(shard.Id) + + // Find insertion point to keep sorted + idx := sort.Search(len(si.shards), func(i int) bool { + return si.shards[i].Id > shard.Id + }) + + // Insert at idx + si.shards = append(si.shards, ShardInfo{}) + copy(si.shards[idx+1:], si.shards[idx:]) + si.shards[idx] = shard +} + +// Delete deletes a shard by ID. +func (si *ShardsInfo) Delete(id ShardId) { + if id >= MaxShardCount { + return + } + si.mu.Lock() + defer si.mu.Unlock() + + if !si.shardBits.Has(id) { + return // Not present + } + + si.shardBits = si.shardBits.Clear(id) + + // Find and remove from slice + idx := si.findIndex(id) + if idx >= 0 { + si.shards = append(si.shards[:idx], si.shards[idx+1:]...) + } +} + +// Bitmap returns a bitmap for all existing shard IDs. +func (si *ShardsInfo) Bitmap() uint32 { + si.mu.RLock() + defer si.mu.RUnlock() + return uint32(si.shardBits) +} + +// Size returns the size of a given shard ID, if present. +func (si *ShardsInfo) Size(id ShardId) ShardSize { + if id >= MaxShardCount { + return 0 + } + si.mu.RLock() + defer si.mu.RUnlock() + + if !si.shardBits.Has(id) { + return 0 + } + + idx := si.findIndex(id) + if idx >= 0 { + return si.shards[idx].Size + } + return 0 +} + +// TotalSize returns the size for all shards. +func (si *ShardsInfo) TotalSize() ShardSize { + si.mu.RLock() + defer si.mu.RUnlock() + var total ShardSize + for _, s := range si.shards { + total += s.Size + } + return total +} + +// Sizes returns a compact slice of present shard sizes, from first to last. +func (si *ShardsInfo) Sizes() []ShardSize { + si.mu.RLock() + defer si.mu.RUnlock() + + res := make([]ShardSize, len(si.shards)) + for i, s := range si.shards { + res[i] = s.Size + } + return res +} + +// SizesInt64 returns a compact slice of present shard sizes, from first to last, as int64. +func (si *ShardsInfo) SizesInt64() []int64 { + sizes := si.Sizes() + res := make([]int64, len(sizes)) + for i, s := range sizes { + res[i] = int64(s) + } + return res +} + +// Copy creates a copy of a ShardInfo. +func (si *ShardsInfo) Copy() *ShardsInfo { + si.mu.RLock() + defer si.mu.RUnlock() + + newShards := make([]ShardInfo, len(si.shards)) + copy(newShards, si.shards) + + return &ShardsInfo{ + shards: newShards, + shardBits: si.shardBits, + } +} + +// DeleteParityShards removes parity shards from a ShardInfo. +func (si *ShardsInfo) DeleteParityShards() { + for id := DataShardsCount; id < TotalShardsCount; id++ { + si.Delete(ShardId(id)) + } +} + +// MinusParityShards creates a ShardInfo copy, but with parity shards removed. +func (si *ShardsInfo) MinusParityShards() *ShardsInfo { + result := si.Copy() + result.DeleteParityShards() + return result +} + +// Add merges all shards from another ShardInfo into this one. +func (si *ShardsInfo) Add(other *ShardsInfo) { + other.mu.RLock() + // Copy shards to avoid holding lock on 'other' while calling si.Set, which could deadlock. + shardsToAdd := make([]ShardInfo, len(other.shards)) + copy(shardsToAdd, other.shards) + other.mu.RUnlock() + + for _, s := range shardsToAdd { + si.Set(s) + } +} + +// Subtract removes all shards present on another ShardInfo. +func (si *ShardsInfo) Subtract(other *ShardsInfo) { + other.mu.RLock() + // Copy shards to avoid holding lock on 'other' while calling si.Delete, which could deadlock. + shardsToRemove := make([]ShardInfo, len(other.shards)) + copy(shardsToRemove, other.shards) + other.mu.RUnlock() + + for _, s := range shardsToRemove { + si.Delete(s.Id) + } +} + +// Plus returns a new ShardInfo consisting of (this + other). +func (si *ShardsInfo) Plus(other *ShardsInfo) *ShardsInfo { + result := si.Copy() + result.Add(other) + return result +} + +// Minus returns a new ShardInfo consisting of (this - other). +func (si *ShardsInfo) Minus(other *ShardsInfo) *ShardsInfo { + result := si.Copy() + result.Subtract(other) + return result +} + +// findIndex finds the index of a shard by ID using binary search. +// Must be called with lock held. Returns -1 if not found. +func (si *ShardsInfo) findIndex(id ShardId) int { + idx := sort.Search(len(si.shards), func(i int) bool { + return si.shards[i].Id >= id + }) + if idx < len(si.shards) && si.shards[idx].Id == id { + return idx + } + return -1 +} diff --git a/weed/storage/erasure_coding/ec_shards_info_test.go b/weed/storage/erasure_coding/ec_shards_info_test.go new file mode 100644 index 000000000..523217417 --- /dev/null +++ b/weed/storage/erasure_coding/ec_shards_info_test.go @@ -0,0 +1,366 @@ +package erasure_coding + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" +) + +func TestShardsInfo_SetAndGet(t *testing.T) { + si := NewShardsInfo() + + // Test setting shards + si.Set(ShardInfo{Id: 0, Size: 1000}) + si.Set(ShardInfo{Id: 5, Size: 2000}) + si.Set(ShardInfo{Id: 13, Size: 3000}) + + // Verify Has + if !si.Has(0) { + t.Error("Expected shard 0 to exist") + } + if !si.Has(5) { + t.Error("Expected shard 5 to exist") + } + if !si.Has(13) { + t.Error("Expected shard 13 to exist") + } + if si.Has(1) { + t.Error("Expected shard 1 to not exist") + } + + // Verify Size + if got := si.Size(0); got != 1000 { + t.Errorf("Expected size 1000, got %d", got) + } + if got := si.Size(5); got != 2000 { + t.Errorf("Expected size 2000, got %d", got) + } + if got := si.Size(13); got != 3000 { + t.Errorf("Expected size 3000, got %d", got) + } + + // Verify Count + if got := si.Count(); got != 3 { + t.Errorf("Expected count 3, got %d", got) + } + + // Verify Bitmap + expectedBitmap := uint32((1 << 0) | (1 << 5) | (1 << 13)) + if got := si.Bitmap(); got != expectedBitmap { + t.Errorf("Expected bitmap %b, got %b", expectedBitmap, got) + } +} + +func TestShardsInfo_SortedOrder(t *testing.T) { + si := NewShardsInfo() + + // Add shards in non-sequential order + si.Set(ShardInfo{Id: 10, Size: 1000}) + si.Set(ShardInfo{Id: 2, Size: 2000}) + si.Set(ShardInfo{Id: 7, Size: 3000}) + si.Set(ShardInfo{Id: 0, Size: 4000}) + + // Verify Ids returns sorted order + ids := si.Ids() + expected := []ShardId{0, 2, 7, 10} + if len(ids) != len(expected) { + t.Fatalf("Expected %d ids, got %d", len(expected), len(ids)) + } + for i, id := range ids { + if id != expected[i] { + t.Errorf("Expected id[%d]=%d, got %d", i, expected[i], id) + } + } +} + +func TestShardsInfo_Delete(t *testing.T) { + si := NewShardsInfo() + + si.Set(ShardInfo{Id: 0, Size: 1000}) + si.Set(ShardInfo{Id: 5, Size: 2000}) + si.Set(ShardInfo{Id: 10, Size: 3000}) + + // Delete middle shard + si.Delete(5) + + if si.Has(5) { + t.Error("Expected shard 5 to be deleted") + } + if !si.Has(0) || !si.Has(10) { + t.Error("Expected other shards to remain") + } + if got := si.Count(); got != 2 { + t.Errorf("Expected count 2, got %d", got) + } + + // Verify slice is still sorted + ids := si.Ids() + if len(ids) != 2 || ids[0] != 0 || ids[1] != 10 { + t.Errorf("Expected ids [0, 10], got %v", ids) + } +} + +func TestShardsInfo_Update(t *testing.T) { + si := NewShardsInfo() + + si.Set(ShardInfo{Id: 5, Size: 1000}) + + // Update existing shard + si.Set(ShardInfo{Id: 5, Size: 2000}) + + if got := si.Size(5); got != 2000 { + t.Errorf("Expected updated size 2000, got %d", got) + } + if got := si.Count(); got != 1 { + t.Errorf("Expected count to remain 1, got %d", got) + } +} + +func TestShardsInfo_TotalSize(t *testing.T) { + si := NewShardsInfo() + + si.Set(ShardInfo{Id: 0, Size: 1000}) + si.Set(ShardInfo{Id: 5, Size: 2000}) + si.Set(ShardInfo{Id: 10, Size: 3000}) + + expected := ShardSize(6000) + if got := si.TotalSize(); got != expected { + t.Errorf("Expected total size %d, got %d", expected, got) + } +} + +func TestShardsInfo_Sizes(t *testing.T) { + si := NewShardsInfo() + + si.Set(ShardInfo{Id: 2, Size: 100}) + si.Set(ShardInfo{Id: 5, Size: 200}) + si.Set(ShardInfo{Id: 8, Size: 300}) + + sizes := si.Sizes() + expected := []ShardSize{100, 200, 300} + + if len(sizes) != len(expected) { + t.Fatalf("Expected %d sizes, got %d", len(expected), len(sizes)) + } + for i, size := range sizes { + if size != expected[i] { + t.Errorf("Expected size[%d]=%d, got %d", i, expected[i], size) + } + } +} + +func TestShardsInfo_Copy(t *testing.T) { + si := NewShardsInfo() + si.Set(ShardInfo{Id: 0, Size: 1000}) + si.Set(ShardInfo{Id: 5, Size: 2000}) + + siCopy := si.Copy() + + // Verify copy has same data + if !siCopy.Has(0) || !siCopy.Has(5) { + t.Error("Copy should have same shards") + } + if siCopy.Size(0) != 1000 || siCopy.Size(5) != 2000 { + t.Error("Copy should have same sizes") + } + + // Modify original + si.Set(ShardInfo{Id: 10, Size: 3000}) + + // Verify copy is independent + if siCopy.Has(10) { + t.Error("Copy should be independent of original") + } +} + +func TestShardsInfo_AddSubtract(t *testing.T) { + si1 := NewShardsInfo() + si1.Set(ShardInfo{Id: 0, Size: 1000}) + si1.Set(ShardInfo{Id: 2, Size: 2000}) + + si2 := NewShardsInfo() + si2.Set(ShardInfo{Id: 2, Size: 9999}) // Different size + si2.Set(ShardInfo{Id: 5, Size: 3000}) + + // Test Add + si1.Add(si2) + if !si1.Has(0) || !si1.Has(2) || !si1.Has(5) { + t.Error("Add should merge shards") + } + if si1.Size(2) != 9999 { + t.Error("Add should update existing shard size") + } + + // Test Subtract + si1.Subtract(si2) + if si1.Has(2) || si1.Has(5) { + t.Error("Subtract should remove shards") + } + if !si1.Has(0) { + t.Error("Subtract should keep non-matching shards") + } +} + +func TestShardsInfo_PlusMinus(t *testing.T) { + si1 := NewShardsInfo() + si1.Set(ShardInfo{Id: 0, Size: 1000}) + si1.Set(ShardInfo{Id: 2, Size: 2000}) + + si2 := NewShardsInfo() + si2.Set(ShardInfo{Id: 2, Size: 2000}) + si2.Set(ShardInfo{Id: 5, Size: 3000}) + + // Test Plus + result := si1.Plus(si2) + if !result.Has(0) || !result.Has(2) || !result.Has(5) { + t.Error("Plus should merge into new instance") + } + if si1.Has(5) { + t.Error("Plus should not modify original") + } + + // Test Minus + result = si1.Minus(si2) + if !result.Has(0) || result.Has(2) { + t.Error("Minus should subtract into new instance") + } + if !si1.Has(2) { + t.Error("Minus should not modify original") + } +} + +func TestShardsInfo_DeleteParityShards(t *testing.T) { + si := NewShardsInfo() + + // Add data shards (0-9) + for i := 0; i < DataShardsCount; i++ { + si.Set(ShardInfo{Id: ShardId(i), Size: ShardSize((i + 1) * 1000)}) + } + + // Add parity shards (10-13) + for i := DataShardsCount; i < TotalShardsCount; i++ { + si.Set(ShardInfo{Id: ShardId(i), Size: ShardSize((i + 1) * 1000)}) + } + + si.DeleteParityShards() + + // Verify only data shards remain + for i := 0; i < DataShardsCount; i++ { + if !si.Has(ShardId(i)) { + t.Errorf("Expected data shard %d to remain", i) + } + } + for i := DataShardsCount; i < TotalShardsCount; i++ { + if si.Has(ShardId(i)) { + t.Errorf("Expected parity shard %d to be deleted", i) + } + } +} + +func TestShardsInfo_FromVolumeEcShardInformationMessage(t *testing.T) { + tests := []struct { + name string + msg *master_pb.VolumeEcShardInformationMessage + wantBits uint32 + wantSizes []int64 + }{ + { + name: "nil message", + msg: nil, + wantBits: 0, + wantSizes: []int64{}, + }, + { + name: "single shard", + msg: &master_pb.VolumeEcShardInformationMessage{ + EcIndexBits: 1 << 5, + ShardSizes: []int64{12345}, + }, + wantBits: 1 << 5, + wantSizes: []int64{12345}, + }, + { + name: "multiple shards", + msg: &master_pb.VolumeEcShardInformationMessage{ + EcIndexBits: (1 << 0) | (1 << 3) | (1 << 7), + ShardSizes: []int64{1000, 2000, 3000}, + }, + wantBits: (1 << 0) | (1 << 3) | (1 << 7), + wantSizes: []int64{1000, 2000, 3000}, + }, + { + name: "missing sizes", + msg: &master_pb.VolumeEcShardInformationMessage{ + EcIndexBits: (1 << 0) | (1 << 3), + ShardSizes: []int64{1000}, + }, + wantBits: (1 << 0) | (1 << 3), + wantSizes: []int64{1000, 0}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + si := ShardsInfoFromVolumeEcShardInformationMessage(tt.msg) + + if got := si.Bitmap(); got != tt.wantBits { + t.Errorf("Bitmap() = %b, want %b", got, tt.wantBits) + } + + if got := si.SizesInt64(); len(got) != len(tt.wantSizes) { + t.Errorf("SizesInt64() length = %d, want %d", len(got), len(tt.wantSizes)) + } else { + for i, size := range got { + if size != tt.wantSizes[i] { + t.Errorf("SizesInt64()[%d] = %d, want %d", i, size, tt.wantSizes[i]) + } + } + } + }) + } +} + +func TestShardsInfo_String(t *testing.T) { + si := NewShardsInfo() + si.Set(ShardInfo{Id: 0, Size: 1024}) + si.Set(ShardInfo{Id: 5, Size: 2048}) + + str := si.String() + if str == "" { + t.Error("String() should not be empty") + } + // Basic validation - should contain shard IDs + if len(str) < 3 { + t.Errorf("String() too short: %s", str) + } +} + +func BenchmarkShardsInfo_Set(b *testing.B) { + si := NewShardsInfo() + b.ResetTimer() + for i := 0; i < b.N; i++ { + si.Set(ShardInfo{Id: ShardId(i % TotalShardsCount), Size: ShardSize(i * 1000)}) + } +} + +func BenchmarkShardsInfo_Has(b *testing.B) { + si := NewShardsInfo() + for i := 0; i < TotalShardsCount; i++ { + si.Set(ShardInfo{Id: ShardId(i), Size: ShardSize(i * 1000)}) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + si.Has(ShardId(i % TotalShardsCount)) + } +} + +func BenchmarkShardsInfo_Size(b *testing.B) { + si := NewShardsInfo() + for i := 0; i < TotalShardsCount; i++ { + si.Set(ShardInfo{Id: ShardId(i), Size: ShardSize(i * 1000)}) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + si.Size(ShardId(i % TotalShardsCount)) + } +} diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index 8f2353472..fb585e5ab 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -256,7 +256,7 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage(diskId uint32) (messages [ // Update EC shard bits and sizes. si := ShardsInfoFromVolumeEcShardInformationMessage(m) - si.Set(s.ShardId, ShardSize(s.Size())) + si.Set(NewShardInfo(s.ShardId, ShardSize(s.Size()))) m.EcIndexBits = uint32(si.Bitmap()) m.ShardSizes = si.SizesInt64() } diff --git a/weed/storage/erasure_coding/ec_volume_info.go b/weed/storage/erasure_coding/ec_volume_info.go index ca98bb658..8a4359a53 100644 --- a/weed/storage/erasure_coding/ec_volume_info.go +++ b/weed/storage/erasure_coding/ec_volume_info.go @@ -1,265 +1,10 @@ package erasure_coding import ( - "fmt" - "sort" - - "github.com/dustin/go-humanize" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage/needle" ) -// ShardsInfo encapsulates information for EC shards -type ShardSize int64 -type ShardInfo struct { - Id ShardId - Size ShardSize -} -type ShardsInfo struct { - shards map[ShardId]*ShardInfo -} - -func NewShardsInfo() *ShardsInfo { - return &ShardsInfo{ - shards: map[ShardId]*ShardInfo{}, - } -} - -// Initializes a ShardsInfo from a ECVolume. -func ShardsInfoFromVolume(ev *EcVolume) *ShardsInfo { - res := &ShardsInfo{ - shards: map[ShardId]*ShardInfo{}, - } - for _, s := range ev.Shards { - res.Set(s.ShardId, ShardSize(s.Size())) - } - return res -} - -// Initializes a ShardsInfo from a VolumeEcShardInformationMessage proto. -func ShardsInfoFromVolumeEcShardInformationMessage(vi *master_pb.VolumeEcShardInformationMessage) *ShardsInfo { - res := NewShardsInfo() - if vi == nil { - return res - } - - var id ShardId - var j int - for bitmap := vi.EcIndexBits; bitmap != 0; bitmap >>= 1 { - if bitmap&1 != 0 { - var size ShardSize - if j < len(vi.ShardSizes) { - size = ShardSize(vi.ShardSizes[j]) - } - j++ - res.shards[id] = &ShardInfo{ - Id: id, - Size: size, - } - } - id++ - } - - return res -} - -// Returns a count of shards from a VolumeEcShardInformationMessage proto. -func ShardsCountFromVolumeEcShardInformationMessage(vi *master_pb.VolumeEcShardInformationMessage) int { - if vi == nil { - return 0 - } - - return ShardsInfoFromVolumeEcShardInformationMessage(vi).Count() -} - -// Returns a string representation for a ShardsInfo. -func (sp *ShardsInfo) String() string { - var res string - ids := sp.Ids() - for i, id := range sp.Ids() { - res += fmt.Sprintf("%d:%s", id, humanize.Bytes(uint64(sp.shards[id].Size))) - if i < len(ids)-1 { - res += " " - } - } - return res -} - -// AsSlice converts a ShardsInfo to a slice of ShardInfo structs, ordered by shard ID. -func (si *ShardsInfo) AsSlice() []*ShardInfo { - res := make([]*ShardInfo, len(si.shards)) - i := 0 - for _, id := range si.Ids() { - res[i] = si.shards[id] - i++ - } - - return res -} - -// Count returns the number of EC shards. -func (si *ShardsInfo) Count() int { - return len(si.shards) -} - -// Has verifies if a shard ID is present. -func (si *ShardsInfo) Has(id ShardId) bool { - _, ok := si.shards[id] - return ok -} - -// Ids returns a list of shard IDs, in ascending order. -func (si *ShardsInfo) Ids() []ShardId { - ids := []ShardId{} - for id := range si.shards { - ids = append(ids, id) - } - sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] }) - - return ids -} - -// IdsInt returns a list of shards ID as int, in ascending order. -func (si *ShardsInfo) IdsInt() []int { - ids := si.Ids() - res := make([]int, len(ids)) - for i, id := range ids { - res[i] = int(id) - } - - return res -} - -// Ids returns a list of shards ID as uint32, in ascending order. -func (si *ShardsInfo) IdsUint32() []uint32 { - return ShardIdsToUint32(si.Ids()) -} - -// Set sets the size for a given shard ID. -func (si *ShardsInfo) Set(id ShardId, size ShardSize) { - if id >= MaxShardCount { - return - } - si.shards[id] = &ShardInfo{ - Id: id, - Size: size, - } -} - -// Delete deletes a shard by ID. -func (si *ShardsInfo) Delete(id ShardId) { - if id >= MaxShardCount { - return - } - if _, ok := si.shards[id]; ok { - delete(si.shards, id) - } -} - -// Bitmap returns a bitmap for all existing shard IDs (bit 0 = shard #0... bit 31 = shard #31), in little endian. -func (si *ShardsInfo) Bitmap() uint32 { - var bits uint32 - for id := range si.shards { - bits |= (1 << id) - } - return bits -} - -// Size returns the size of a given shard ID, if present. -func (si *ShardsInfo) Size(id ShardId) ShardSize { - if s, ok := si.shards[id]; ok { - return s.Size - } - return 0 -} - -// TotalSize returns the size for all shards. -func (si *ShardsInfo) TotalSize() ShardSize { - var total ShardSize - for _, s := range si.shards { - total += s.Size - } - return total -} - -// Sizes returns a compact slice of present shard sizes, from first to last. -func (si *ShardsInfo) Sizes() []ShardSize { - ids := si.Ids() - - res := make([]ShardSize, len(ids)) - if len(res) != 0 { - var i int - for _, id := range ids { - res[i] = si.shards[id].Size - i++ - } - } - - return res -} - -// SizesInt64 returns a compact slice of present shard sizes, from first to last, as int64. -func (si *ShardsInfo) SizesInt64() []int64 { - res := make([]int64, si.Count()) - - for i, s := range si.Sizes() { - res[i] = int64(s) - } - return res -} - -// Copy creates a copy of a ShardInfo. -func (si *ShardsInfo) Copy() *ShardsInfo { - new := NewShardsInfo() - for _, s := range si.shards { - new.Set(s.Id, s.Size) - } - return new -} - -// DeleteParityShards removes party shards from a ShardInfo. -// Assumes default 10+4 EC layout where parity shards are IDs 10-13. -func (si *ShardsInfo) DeleteParityShards() { - for id := DataShardsCount; id < TotalShardsCount; id++ { - si.Delete(ShardId(id)) - } -} - -// MinusParityShards creates a ShardInfo copy, but with parity shards removed. -func (si *ShardsInfo) MinusParityShards() *ShardsInfo { - new := si.Copy() - new.DeleteParityShards() - return new -} - -// Add merges all shards from another ShardInfo into this one. -func (si *ShardsInfo) Add(other *ShardsInfo) { - for _, s := range other.shards { - si.Set(s.Id, s.Size) - } -} - -// Subtract removes all shards present on another ShardInfo. -func (si *ShardsInfo) Subtract(other *ShardsInfo) { - for _, s := range other.shards { - si.Delete(s.Id) - } -} - -// Plus returns a new ShardInfo consisting of (this + other). -func (si *ShardsInfo) Plus(other *ShardsInfo) *ShardsInfo { - new := si.Copy() - new.Add(other) - return new -} - -// Minus returns a new ShardInfo consisting of (this - other). -func (si *ShardsInfo) Minus(other *ShardsInfo) *ShardsInfo { - new := si.Copy() - new.Subtract(other) - return new -} - // data structure used in master type EcVolumeInfo struct { VolumeId needle.VolumeId diff --git a/weed/storage/erasure_coding/ec_volume_info_test.go b/weed/storage/erasure_coding/ec_volume_info_test.go index b942ebdcd..61fff54ce 100644 --- a/weed/storage/erasure_coding/ec_volume_info_test.go +++ b/weed/storage/erasure_coding/ec_volume_info_test.go @@ -11,7 +11,7 @@ import ( func TestShardsInfoDeleteParityShards(t *testing.T) { si := erasure_coding.NewShardsInfo() for _, id := range erasure_coding.AllShardIds() { - si.Set(id, 123) + si.Set(erasure_coding.ShardInfo{Id: id, Size: 123}) } si.DeleteParityShards() @@ -23,16 +23,16 @@ func TestShardsInfoDeleteParityShards(t *testing.T) { func TestShardsInfoAsSlice(t *testing.T) { si := erasure_coding.NewShardsInfo() - si.Set(5, 555) - si.Set(2, 222) - si.Set(7, 777) - si.Set(1, 111) + si.Set(erasure_coding.ShardInfo{Id: 5, Size: 555}) + si.Set(erasure_coding.ShardInfo{Id: 2, Size: 222}) + si.Set(erasure_coding.ShardInfo{Id: 7, Size: 777}) + si.Set(erasure_coding.ShardInfo{Id: 1, Size: 111}) - want := []*erasure_coding.ShardInfo{ - &erasure_coding.ShardInfo{Id: 1, Size: 111}, - &erasure_coding.ShardInfo{Id: 2, Size: 222}, - &erasure_coding.ShardInfo{Id: 5, Size: 555}, - &erasure_coding.ShardInfo{Id: 7, Size: 777}, + want := []erasure_coding.ShardInfo{ + {Id: 1, Size: 111}, + {Id: 2, Size: 222}, + {Id: 5, Size: 555}, + {Id: 7, Size: 777}, } if got := si.AsSlice(); !reflect.DeepEqual(got, want) { t.Errorf("expected %v, got %v", want, got) @@ -85,7 +85,7 @@ func TestShardsInfoSerialize(t *testing.T) { t.Run(tc.name, func(t *testing.T) { si := erasure_coding.NewShardsInfo() for id, size := range tc.shardIds { - si.Set(id, size) + si.Set(erasure_coding.ShardInfo{Id: id, Size: size}) } if got, want := si.Bitmap(), tc.wantBits; got != want { @@ -152,17 +152,17 @@ func TestShardsInfoFromVolumeEcShardInformationMessage(t *testing.T) { func TestShardsInfoCombine(t *testing.T) { a := erasure_coding.NewShardsInfo() - a.Set(1, 111) - a.Set(2, 222) - a.Set(3, 333) - a.Set(4, 444) - a.Set(5, 0) + a.Set(erasure_coding.ShardInfo{Id: 1, Size: 111}) + a.Set(erasure_coding.ShardInfo{Id: 2, Size: 222}) + a.Set(erasure_coding.ShardInfo{Id: 3, Size: 333}) + a.Set(erasure_coding.ShardInfo{Id: 4, Size: 444}) + a.Set(erasure_coding.ShardInfo{Id: 5, Size: 0}) b := erasure_coding.NewShardsInfo() - b.Set(1, 555) - b.Set(4, 666) - b.Set(5, 777) - b.Set(6, 888) + b.Set(erasure_coding.ShardInfo{Id: 1, Size: 555}) + b.Set(erasure_coding.ShardInfo{Id: 4, Size: 666}) + b.Set(erasure_coding.ShardInfo{Id: 5, Size: 777}) + b.Set(erasure_coding.ShardInfo{Id: 6, Size: 888}) if got, want := a.Plus(b).String(), "1:555 B 2:222 B 3:333 B 4:666 B 5:777 B 6:888 B"; got != want { t.Errorf("expected %q for plus, got %q", want, got) diff --git a/weed/storage/store.go b/weed/storage/store.go index b214b3533..8c8571cc8 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -101,12 +101,8 @@ func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int, diskId := uint32(i) // Track disk ID location.ecShardNotifyHandler = func(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume) { - var shardSize int64 - if shard, found := ecVolume.FindEcVolumeShard(shardId); found { - shardSize = shard.Size() - } si := erasure_coding.NewShardsInfo() - si.Set(shardId, erasure_coding.ShardSize(shardSize)) + si.Set(erasure_coding.NewShardInfo(shardId, erasure_coding.ShardSize(ecVolume.ShardSize()))) // Use non-blocking send during startup to avoid deadlock // The channel reader only starts after connecting to master, but we're loading during startup diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 0d30128a6..f4c7bad02 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -54,7 +54,7 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er glog.V(0).Infof("MountEcShards %d.%d on disk ID %d", vid, shardId, diskId) si := erasure_coding.NewShardsInfo() - si.Set(shardId, erasure_coding.ShardSize(ecVolume.ShardSize())) + si.Set(erasure_coding.NewShardInfo(shardId, erasure_coding.ShardSize(ecVolume.ShardSize()))) s.NewEcShardsChan <- master_pb.VolumeEcShardInformationMessage{ Id: uint32(vid), Collection: collection, @@ -82,7 +82,7 @@ func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.Shar } si := erasure_coding.NewShardsInfo() - si.Set(shardId, 0) + si.Set(erasure_coding.NewShardInfo(shardId, 0)) message := master_pb.VolumeEcShardInformationMessage{ Id: uint32(vid), Collection: ecShard.Collection, diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index 3de358c4d..f1ac3f626 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -503,7 +503,7 @@ func diskInfosToCandidates(disks []*topology.DiskInfo) []*placement.DiskCandidat if disk.DiskInfo.EcShardInfos != nil { for _, shardInfo := range disk.DiskInfo.EcShardInfos { if shardInfo.DiskId == disk.DiskID { - ecShardCount += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(shardInfo) + ecShardCount += erasure_coding.GetShardCount(shardInfo) } } }