From 6b98b52accb1d0737ebc648a2c6a4ac80bf1e65b Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Mon, 29 Dec 2025 04:30:42 +0100 Subject: [PATCH] Fix reporting of EC shard sizes from nodes to masters. (#7835) SeaweedFS tracks EC shard sizes on topology data stuctures, but this information is never relayed to master servers :( The end result is that commands reporting disk usage, such as `volume.list` and `cluster.status`, yield incorrect figures when EC shards are present. As an example for a simple 5-node test cluster, before... ``` > volume.list Topology volumeSizeLimit:30000 MB hdd(volume:6/40 active:6 free:33 remote:0) DataCenter DefaultDataCenter hdd(volume:6/40 active:6 free:33 remote:0) Rack DefaultRack hdd(volume:6/40 active:6 free:33 remote:0) DataNode 192.168.10.111:9001 hdd(volume:1/8 active:1 free:7 remote:0) Disk hdd(volume:1/8 active:1 free:7 remote:0) id:0 volume id:3 size:88967096 file_count:172 replica_placement:2 version:3 modified_at_second:1766349617 ec volume id:1 collection: shards:[1 5] Disk hdd total size:88967096 file_count:172 DataNode 192.168.10.111:9001 total size:88967096 file_count:172 DataCenter DefaultDataCenter hdd(volume:6/40 active:6 free:33 remote:0) Rack DefaultRack hdd(volume:6/40 active:6 free:33 remote:0) DataNode 192.168.10.111:9002 hdd(volume:2/8 active:2 free:6 remote:0) Disk hdd(volume:2/8 active:2 free:6 remote:0) id:0 volume id:2 size:77267536 file_count:166 replica_placement:2 version:3 modified_at_second:1766349617 volume id:3 size:88967096 file_count:172 replica_placement:2 version:3 modified_at_second:1766349617 ec volume id:1 collection: shards:[0 4] Disk hdd total size:166234632 file_count:338 DataNode 192.168.10.111:9002 total size:166234632 file_count:338 DataCenter DefaultDataCenter hdd(volume:6/40 active:6 free:33 remote:0) Rack DefaultRack hdd(volume:6/40 active:6 free:33 remote:0) DataNode 192.168.10.111:9003 hdd(volume:1/8 active:1 free:7 remote:0) Disk hdd(volume:1/8 active:1 free:7 remote:0) id:0 volume id:2 size:77267536 file_count:166 replica_placement:2 version:3 modified_at_second:1766349617 ec volume id:1 collection: shards:[2 6] Disk hdd total size:77267536 file_count:166 DataNode 192.168.10.111:9003 total size:77267536 file_count:166 DataCenter DefaultDataCenter hdd(volume:6/40 active:6 free:33 remote:0) Rack DefaultRack hdd(volume:6/40 active:6 free:33 remote:0) DataNode 192.168.10.111:9004 hdd(volume:2/8 active:2 free:6 remote:0) Disk hdd(volume:2/8 active:2 free:6 remote:0) id:0 volume id:2 size:77267536 file_count:166 replica_placement:2 version:3 modified_at_second:1766349617 volume id:3 size:88967096 file_count:172 replica_placement:2 version:3 modified_at_second:1766349617 ec volume id:1 collection: shards:[3 7] Disk hdd total size:166234632 file_count:338 DataNode 192.168.10.111:9004 total size:166234632 file_count:338 DataCenter DefaultDataCenter hdd(volume:6/40 active:6 free:33 remote:0) Rack DefaultRack hdd(volume:6/40 active:6 free:33 remote:0) DataNode 192.168.10.111:9005 hdd(volume:0/8 active:0 free:8 remote:0) Disk hdd(volume:0/8 active:0 free:8 remote:0) id:0 ec volume id:1 collection: shards:[8 9 10 11 12 13] Disk hdd total size:0 file_count:0 Rack DefaultRack total size:498703896 file_count:1014 DataCenter DefaultDataCenter total size:498703896 file_count:1014 total size:498703896 file_count:1014 ``` ...and after: ``` > volume.list Topology volumeSizeLimit:30000 MB hdd(volume:6/40 active:6 free:33 remote:0) DataCenter DefaultDataCenter hdd(volume:6/40 active:6 free:33 remote:0) Rack DefaultRack hdd(volume:6/40 active:6 free:33 remote:0) DataNode 192.168.10.111:9001 hdd(volume:1/8 active:1 free:7 remote:0) Disk hdd(volume:1/8 active:1 free:7 remote:0) id:0 volume id:2 size:81761800 file_count:161 replica_placement:2 version:3 modified_at_second:1766349495 ec volume id:1 collection: shards:[1 5 9] sizes:[1:8.00 MiB 5:8.00 MiB 9:8.00 MiB] total:24.00 MiB Disk hdd total size:81761800 file_count:161 DataNode 192.168.10.111:9001 total size:81761800 file_count:161 DataCenter DefaultDataCenter hdd(volume:6/40 active:6 free:33 remote:0) Rack DefaultRack hdd(volume:6/40 active:6 free:33 remote:0) DataNode 192.168.10.111:9002 hdd(volume:1/8 active:1 free:7 remote:0) Disk hdd(volume:1/8 active:1 free:7 remote:0) id:0 volume id:3 size:88678712 file_count:170 replica_placement:2 version:3 modified_at_second:1766349495 ec volume id:1 collection: shards:[11 12 13] sizes:[11:8.00 MiB 12:8.00 MiB 13:8.00 MiB] total:24.00 MiB Disk hdd total size:88678712 file_count:170 DataNode 192.168.10.111:9002 total size:88678712 file_count:170 DataCenter DefaultDataCenter hdd(volume:6/40 active:6 free:33 remote:0) Rack DefaultRack hdd(volume:6/40 active:6 free:33 remote:0) DataNode 192.168.10.111:9003 hdd(volume:2/8 active:2 free:6 remote:0) Disk hdd(volume:2/8 active:2 free:6 remote:0) id:0 volume id:2 size:81761800 file_count:161 replica_placement:2 version:3 modified_at_second:1766349495 volume id:3 size:88678712 file_count:170 replica_placement:2 version:3 modified_at_second:1766349495 ec volume id:1 collection: shards:[0 4 8] sizes:[0:8.00 MiB 4:8.00 MiB 8:8.00 MiB] total:24.00 MiB Disk hdd total size:170440512 file_count:331 DataNode 192.168.10.111:9003 total size:170440512 file_count:331 DataCenter DefaultDataCenter hdd(volume:6/40 active:6 free:33 remote:0) Rack DefaultRack hdd(volume:6/40 active:6 free:33 remote:0) DataNode 192.168.10.111:9004 hdd(volume:2/8 active:2 free:6 remote:0) Disk hdd(volume:2/8 active:2 free:6 remote:0) id:0 volume id:2 size:81761800 file_count:161 replica_placement:2 version:3 modified_at_second:1766349495 volume id:3 size:88678712 file_count:170 replica_placement:2 version:3 modified_at_second:1766349495 ec volume id:1 collection: shards:[2 6 10] sizes:[2:8.00 MiB 6:8.00 MiB 10:8.00 MiB] total:24.00 MiB Disk hdd total size:170440512 file_count:331 DataNode 192.168.10.111:9004 total size:170440512 file_count:331 DataCenter DefaultDataCenter hdd(volume:6/40 active:6 free:33 remote:0) Rack DefaultRack hdd(volume:6/40 active:6 free:33 remote:0) DataNode 192.168.10.111:9005 hdd(volume:0/8 active:0 free:8 remote:0) Disk hdd(volume:0/8 active:0 free:8 remote:0) id:0 ec volume id:1 collection: shards:[3 7] sizes:[3:8.00 MiB 7:8.00 MiB] total:16.00 MiB Disk hdd total size:0 file_count:0 Rack DefaultRack total size:511321536 file_count:993 DataCenter DefaultDataCenter total size:511321536 file_count:993 total size:511321536 file_count:993 ``` --- weed/admin/dash/volume_management.go | 17 +- weed/server/volume_grpc_client_to_master.go | 8 +- weed/server/volume_grpc_erasure_coding.go | 7 +- weed/shell/command_cluster_status.go | 2 +- weed/shell/command_ec_common.go | 211 +++++----- weed/shell/command_ec_decode.go | 81 ++-- weed/shell/command_ec_encode.go | 5 +- weed/shell/command_ec_rebuild.go | 26 +- weed/shell/command_ec_rebuild_test.go | 34 +- weed/shell/command_ec_test.go | 104 ++--- weed/shell/command_volume_balance.go | 4 +- weed/shell/command_volume_list.go | 15 +- weed/shell/command_volume_list_test.go | 8 +- weed/shell/command_volume_server_evacuate.go | 4 +- weed/shell/ec_proportional_rebalance.go | 21 +- weed/shell/ec_rebalance_slots_test.go | 15 +- weed/storage/erasure_coding/ec_shard.go | 27 ++ .../erasure_coding/ec_shard_size_helper.go | 68 --- .../ec_shard_size_helper_test.go | 117 ------ weed/storage/erasure_coding/ec_volume.go | 42 +- weed/storage/erasure_coding/ec_volume_info.go | 388 ++++++++++-------- .../erasure_coding/ec_volume_info_test.go | 173 ++++++++ weed/storage/store_ec.go | 14 +- weed/topology/data_node_ec.go | 13 +- weed/topology/disk_ec.go | 29 +- weed/topology/topology_ec.go | 27 +- weed/topology/topology_test.go | 2 + weed/worker/tasks/erasure_coding/detection.go | 2 +- 28 files changed, 746 insertions(+), 718 deletions(-) delete mode 100644 weed/storage/erasure_coding/ec_shard_size_helper.go delete mode 100644 weed/storage/erasure_coding/ec_shard_size_helper_test.go create mode 100644 weed/storage/erasure_coding/ec_volume_info_test.go diff --git a/weed/admin/dash/volume_management.go b/weed/admin/dash/volume_management.go index 6075f03ff..4009922bb 100644 --- a/weed/admin/dash/volume_management.go +++ b/weed/admin/dash/volume_management.go @@ -492,20 +492,17 @@ func (s *AdminServer) GetClusterVolumeServers() (*ClusterVolumeServersData, erro // Merge EcIndexBits from all disks and collect shard sizes allShardSizes := make(map[erasure_coding.ShardId]int64) for _, ecShardInfo := range ecShardInfos { - ecInfo.EcIndexBits |= ecShardInfo.EcIndexBits + si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(ecShardInfo) + ecInfo.EcIndexBits |= si.Bitmap() // Collect shard sizes from this disk - shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) - shardBits.EachSetIndex(func(shardId erasure_coding.ShardId) { - if size, found := erasure_coding.GetShardSize(ecShardInfo, shardId); found { - allShardSizes[shardId] = size - } - }) + for _, id := range si.Ids() { + allShardSizes[id] += int64(si.Size(id)) + } } // Process final merged shard information - finalShardBits := erasure_coding.ShardBits(ecInfo.EcIndexBits) - finalShardBits.EachSetIndex(func(shardId erasure_coding.ShardId) { + for shardId := range allShardSizes { ecInfo.ShardCount++ ecInfo.ShardNumbers = append(ecInfo.ShardNumbers, int(shardId)) vs.EcShards++ @@ -516,7 +513,7 @@ func (s *AdminServer) GetClusterVolumeServers() (*ClusterVolumeServersData, erro ecInfo.TotalSize += shardSize vs.DiskUsage += shardSize // Add EC shard size to total disk usage } - }) + } ecVolumeMap[volumeId] = ecInfo } diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 5022a9ede..fecab4894 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -237,8 +237,8 @@ func (vs *VolumeServer) doHeartbeatWithRetry(masterAddress pb.ServerAddress, grp &ecShardMessage, // ecShardMessage is already a copy from the channel receive }, } - glog.V(0).Infof("volume server %s:%d adds ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id, - erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds()) + si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(&ecShardMessage) + glog.V(0).Infof("volume server %s:%d adds ec shards to %d [%s]", vs.store.Ip, vs.store.Port, ecShardMessage.Id, si.String()) if err = stream.Send(deltaBeat); err != nil { glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err) return "", err @@ -268,8 +268,8 @@ func (vs *VolumeServer) doHeartbeatWithRetry(masterAddress pb.ServerAddress, grp &ecShardMessage, // ecShardMessage is already a copy from the channel receive }, } - glog.V(0).Infof("volume server %s:%d deletes ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id, - erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds()) + si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(&ecShardMessage) + glog.V(0).Infof("volume server %s:%d deletes ec shards from %d [%s]", vs.store.Ip, vs.store.Port, ecShardMessage.Id, si.String()) if err = stream.Send(deltaBeat); err != nil { glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err) return "", err diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index ec59ffa39..97abba98f 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -548,11 +548,10 @@ func (vs *VolumeServer) VolumeEcShardsInfo(ctx context.Context, req *volume_serv for _, location := range vs.store.Locations { if v, found := location.FindEcVolume(needle.VolumeId(req.VolumeId)); found { // Get shard details from the EC volume - shardDetails := v.ShardDetails() - for _, shardDetail := range shardDetails { + for _, si := range erasure_coding.ShardsInfoFromVolume(v).AsSlice() { ecShardInfo := &volume_server_pb.EcShardInfo{ - ShardId: uint32(shardDetail.ShardId), - Size: int64(shardDetail.Size), + ShardId: uint32(si.Id), + Size: int64(si.Size), Collection: v.Collection, } ecShardInfos = append(ecShardInfos, ecShardInfo) diff --git a/weed/shell/command_cluster_status.go b/weed/shell/command_cluster_status.go index 746ca4863..1070c257b 100644 --- a/weed/shell/command_cluster_status.go +++ b/weed/shell/command_cluster_status.go @@ -310,7 +310,7 @@ func (sp *ClusterStatusPrinter) printVolumeInfo() { for _, eci := range di.EcShardInfos { vid := needle.VolumeId(eci.Id) ecVolumeIds[vid] = true - ecShards += erasure_coding.ShardBits(eci.EcIndexBits).ShardIdCount() + ecShards += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(eci) } } } diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 6f9543ca4..82d661289 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -32,8 +32,8 @@ type EcDisk struct { diskType string freeEcSlots int ecShardCount int // Total EC shards on this disk - // Map of volumeId -> shardBits for shards on this disk - ecShards map[needle.VolumeId]erasure_coding.ShardBits + // Map of volumeId -> ShardsInfo for shards on this disk + ecShards map[needle.VolumeId]*erasure_coding.ShardsInfo } type EcNode struct { @@ -277,14 +277,14 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, return fmt.Errorf("lock is lost") } - copiedShardIds := []uint32{uint32(shardId)} + copiedShardIds := []erasure_coding.ShardId{shardId} if applyBalancing { existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info) // ask destination node to copy shard and the ecx file from source node, and mount it - copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress, destDiskId) + copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []erasure_coding.ShardId{shardId}, vid, collection, existingServerAddress, destDiskId) if err != nil { return err } @@ -317,8 +317,8 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, } func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, - targetServer *EcNode, shardIdsToCopy []uint32, - volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress, destDiskId uint32) (copiedShardIds []uint32, err error) { + targetServer *EcNode, shardIdsToCopy []erasure_coding.ShardId, + volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress, destDiskId uint32) (copiedShardIds []erasure_coding.ShardId, err error) { fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) @@ -330,7 +330,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(volumeId), Collection: collection, - ShardIds: shardIdsToCopy, + ShardIds: erasure_coding.ShardIdsToUint32(shardIdsToCopy), CopyEcxFile: true, CopyEcjFile: true, CopyVifFile: true, @@ -346,7 +346,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ VolumeId: uint32(volumeId), Collection: collection, - ShardIds: shardIdsToCopy, + ShardIds: erasure_coding.ShardIdsToUint32(shardIdsToCopy), }) if mountErr != nil { return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr) @@ -414,9 +414,8 @@ func swap(data []*CandidateEcNode, i, j int) { } func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) { - for _, ecShardInfo := range ecShardInfos { - shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) - count += shardBits.ShardIdCount() + for _, eci := range ecShardInfos { + count += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(eci) } return } @@ -440,10 +439,9 @@ func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (c func (ecNode *EcNode) localShardIdCount(vid uint32) int { for _, diskInfo := range ecNode.info.DiskInfos { - for _, ecShardInfo := range diskInfo.EcShardInfos { - if vid == ecShardInfo.Id { - shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) - return shardBits.ShardIdCount() + for _, eci := range diskInfo.EcShardInfos { + if vid == eci.Id { + return erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(eci) } } } @@ -483,18 +481,18 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter } // Group EC shards by disk_id - diskShards := make(map[uint32]map[needle.VolumeId]erasure_coding.ShardBits) + diskShards := make(map[uint32]map[needle.VolumeId]*erasure_coding.ShardsInfo) for _, diskInfo := range dn.DiskInfos { if diskInfo == nil { continue } - for _, ecShardInfo := range diskInfo.EcShardInfos { - diskId := ecShardInfo.DiskId + for _, eci := range diskInfo.EcShardInfos { + diskId := eci.DiskId if diskShards[diskId] == nil { - diskShards[diskId] = make(map[needle.VolumeId]erasure_coding.ShardBits) + diskShards[diskId] = make(map[needle.VolumeId]*erasure_coding.ShardsInfo) } - vid := needle.VolumeId(ecShardInfo.Id) - diskShards[diskId][vid] = erasure_coding.ShardBits(ecShardInfo.EcIndexBits) + vid := needle.VolumeId(eci.Id) + diskShards[diskId][vid] = erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(eci) } } @@ -508,11 +506,11 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter for diskId, diskTypeStr := range allDiskIds { shards := diskShards[diskId] if shards == nil { - shards = make(map[needle.VolumeId]erasure_coding.ShardBits) + shards = make(map[needle.VolumeId]*erasure_coding.ShardsInfo) } totalShardCount := 0 - for _, shardBits := range shards { - totalShardCount += shardBits.ShardIdCount() + for _, shardsInfo := range shards { + totalShardCount += shardsInfo.Count() } ecNode.disks[diskId] = &EcDisk{ @@ -530,7 +528,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter return } -func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeDeletedShardIds []uint32) error { +func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeDeletedShardIds []erasure_coding.ShardId) error { fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation) @@ -538,27 +536,27 @@ func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection strin _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{ VolumeId: uint32(volumeId), Collection: collection, - ShardIds: toBeDeletedShardIds, + ShardIds: erasure_coding.ShardIdsToUint32(toBeDeletedShardIds), }) return deleteErr }) } -func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeUnmountedhardIds []uint32) error { +func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeUnmountedhardIds []erasure_coding.ShardId) error { fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation) return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{ VolumeId: uint32(volumeId), - ShardIds: toBeUnmountedhardIds, + ShardIds: erasure_coding.ShardIdsToUint32(toBeUnmountedhardIds), }) return deleteErr }) } -func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeMountedhardIds []uint32) error { +func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeMountedhardIds []erasure_coding.ShardId) error { fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation) @@ -566,7 +564,7 @@ func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId n _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ VolumeId: uint32(volumeId), Collection: collection, - ShardIds: toBeMountedhardIds, + ShardIds: erasure_coding.ShardIdsToUint32(toBeMountedhardIds), }) return mountErr }) @@ -580,33 +578,35 @@ func ceilDivide(a, b int) int { return (a / b) + r } -func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId, diskType types.DiskType) erasure_coding.ShardBits { - +func findEcVolumeShardsInfo(ecNode *EcNode, vid needle.VolumeId, diskType types.DiskType) *erasure_coding.ShardsInfo { if diskInfo, found := ecNode.info.DiskInfos[string(diskType)]; found { for _, shardInfo := range diskInfo.EcShardInfos { if needle.VolumeId(shardInfo.Id) == vid { - return erasure_coding.ShardBits(shardInfo.EcIndexBits) + return erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(shardInfo) } } } - return 0 + // Returns an empty ShardsInfo struct on failure, to avoid potential nil dereferences. + return erasure_coding.NewShardsInfo() } -func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32, diskType types.DiskType) *EcNode { +// TODO: simplify me +func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []erasure_coding.ShardId, diskType types.DiskType) *EcNode { foundVolume := false diskInfo, found := ecNode.info.DiskInfos[string(diskType)] if found { - for _, shardInfo := range diskInfo.EcShardInfos { - if needle.VolumeId(shardInfo.Id) == vid { - oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits) - newShardBits := oldShardBits + for _, ecsi := range diskInfo.EcShardInfos { + if needle.VolumeId(ecsi.Id) == vid { + si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(ecsi) + oldShardCount := si.Count() for _, shardId := range shardIds { - newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId)) + si.Set(shardId, 0) } - shardInfo.EcIndexBits = uint32(newShardBits) - ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount() + ecsi.EcIndexBits = si.Bitmap() + ecsi.ShardSizes = si.SizesInt64() + ecNode.freeEcSlot -= si.Count() - oldShardCount foundVolume = true break } @@ -619,34 +619,36 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, } if !foundVolume { - var newShardBits erasure_coding.ShardBits - for _, shardId := range shardIds { - newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId)) + si := erasure_coding.NewShardsInfo() + for _, id := range shardIds { + si.Set(id, 0) } diskInfo.EcShardInfos = append(diskInfo.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{ Id: uint32(vid), Collection: collection, - EcIndexBits: uint32(newShardBits), + EcIndexBits: si.Bitmap(), + ShardSizes: si.SizesInt64(), DiskType: string(diskType), }) - ecNode.freeEcSlot -= len(shardIds) + ecNode.freeEcSlot -= si.Count() } return ecNode } -func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32, diskType types.DiskType) *EcNode { +func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []erasure_coding.ShardId, diskType types.DiskType) *EcNode { if diskInfo, found := ecNode.info.DiskInfos[string(diskType)]; found { - for _, shardInfo := range diskInfo.EcShardInfos { - if needle.VolumeId(shardInfo.Id) == vid { - oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits) - newShardBits := oldShardBits + for _, eci := range diskInfo.EcShardInfos { + if needle.VolumeId(eci.Id) == vid { + si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(eci) + oldCount := si.Count() for _, shardId := range shardIds { - newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId)) + si.Delete(shardId) } - shardInfo.EcIndexBits = uint32(newShardBits) - ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount() + eci.EcIndexBits = si.Bitmap() + eci.ShardSizes = si.SizesInt64() + ecNode.freeEcSlot -= si.Count() - oldCount } } } @@ -754,8 +756,8 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum // Use MaxShardCount (32) to support custom EC ratios shardToLocations := make([][]*EcNode, erasure_coding.MaxShardCount) for _, ecNode := range locations { - shardBits := findEcVolumeShards(ecNode, vid, ecb.diskType) - for _, shardId := range shardBits.ShardIds() { + si := findEcVolumeShardsInfo(ecNode, vid, ecb.diskType) + for _, shardId := range si.Ids() { shardToLocations[shardId] = append(shardToLocations[shardId], ecNode) } } @@ -769,7 +771,7 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum continue } - duplicatedShardIds := []uint32{uint32(shardId)} + duplicatedShardIds := []erasure_coding.ShardId{erasure_coding.ShardId(shardId)} for _, ecNode := range ecNodes[1:] { if err := unmountEcShards(ecb.commandEnv.option.GrpcDialOption, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil { return err @@ -799,8 +801,11 @@ func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error { func countShardsByRack(vid needle.VolumeId, locations []*EcNode, diskType types.DiskType) map[string]int { return groupByCount(locations, func(ecNode *EcNode) (id string, count int) { - shardBits := findEcVolumeShards(ecNode, vid, diskType) - return string(ecNode.rack), shardBits.ShardIdCount() + id = string(ecNode.rack) + if si := findEcVolumeShardsInfo(ecNode, vid, diskType); si != nil { + count = si.Count() + } + return }) } @@ -809,9 +814,9 @@ func shardsByTypePerRack(vid needle.VolumeId, locations []*EcNode, diskType type dataPerRack = make(map[string][]erasure_coding.ShardId) parityPerRack = make(map[string][]erasure_coding.ShardId) for _, ecNode := range locations { - shardBits := findEcVolumeShards(ecNode, vid, diskType) + si := findEcVolumeShardsInfo(ecNode, vid, diskType) rackId := string(ecNode.rack) - for _, shardId := range shardBits.ShardIds() { + for _, shardId := range si.Ids() { if int(shardId) < dataShards { dataPerRack[rackId] = append(dataPerRack[rackId], shardId) } else { @@ -891,8 +896,8 @@ func (ecb *ecBalancer) balanceShardTypeAcrossRacks( shardId := shards[i] // Find which node has this shard for _, ecNode := range ecNodes { - shardBits := findEcVolumeShards(ecNode, vid, ecb.diskType) - if shardBits.HasShardId(shardId) { + si := findEcVolumeShardsInfo(ecNode, vid, ecb.diskType) + if si.Has(shardId) { shardsToMove[shardId] = ecNode break } @@ -1052,10 +1057,10 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error { func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error { for _, ecNode := range existingLocations { - shardBits := findEcVolumeShards(ecNode, vid, ecb.diskType) - overLimitCount := shardBits.ShardIdCount() - averageShardsPerEcNode + si := findEcVolumeShardsInfo(ecNode, vid, ecb.diskType) + overLimitCount := si.Count() - averageShardsPerEcNode - for _, shardId := range shardBits.ShardIds() { + for _, shardId := range si.Ids() { if overLimitCount <= 0 { break @@ -1102,7 +1107,7 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { return } for _, ecShardInfo := range diskInfo.EcShardInfos { - count += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount() + count += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(ecShardInfo) } return ecNode.info.Id, count }) @@ -1132,30 +1137,32 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { } if fullDiskInfo, found := fullNode.info.DiskInfos[string(ecb.diskType)]; found { for _, shards := range fullDiskInfo.EcShardInfos { - if _, found := emptyNodeIds[shards.Id]; !found { - for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() { - vid := needle.VolumeId(shards.Id) - // For balancing, strictly require matching disk type - destDiskId := pickBestDiskOnNode(emptyNode, vid, ecb.diskType, true) - - if destDiskId > 0 { - fmt.Printf("%s moves ec shards %d.%d to %s (disk %d)\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id, destDiskId) - } else { - fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id) - } - - err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing, ecb.diskType) - if err != nil { - return err - } + if _, found := emptyNodeIds[shards.Id]; found { + continue + } + si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(shards) + for _, shardId := range si.Ids() { + vid := needle.VolumeId(shards.Id) + // For balancing, strictly require matching disk type + destDiskId := pickBestDiskOnNode(emptyNode, vid, ecb.diskType, true) + + if destDiskId > 0 { + fmt.Printf("%s moves ec shards %d.%d to %s (disk %d)\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id, destDiskId) + } else { + fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id) + } - ecNodeIdToShardCount[emptyNode.info.Id]++ - ecNodeIdToShardCount[fullNode.info.Id]-- - hasMove = true - break + err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing, ecb.diskType) + if err != nil { + return err } + + ecNodeIdToShardCount[emptyNode.info.Id]++ + ecNodeIdToShardCount[fullNode.info.Id]-- + hasMove = true break } + break } } } @@ -1174,7 +1181,11 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi nodeShards := map[*EcNode]int{} for _, node := range possibleDestinations { - nodeShards[node] = findEcVolumeShards(node, vid, ecb.diskType).ShardIdCount() + count := 0 + if si := findEcVolumeShardsInfo(node, vid, ecb.diskType); si != nil { + count = si.Count() + } + nodeShards[node] = count } targets := []*EcNode{} @@ -1244,8 +1255,8 @@ func diskDistributionScore(ecNode *EcNode, vid needle.VolumeId) int { // Lower total means more room for new shards score := 0 for _, disk := range ecNode.disks { - if shardBits, ok := disk.ecShards[vid]; ok { - score += shardBits.ShardIdCount() * 10 // Weight shards of this volume heavily + if si, ok := disk.ecShards[vid]; ok { + score += si.Count() * 10 // Weight shards of this volume heavily } score += disk.ecShardCount // Also consider total shards on disk } @@ -1272,8 +1283,8 @@ func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId, diskType types.Disk // Check if this volume already has shards on this disk existingShards := 0 - if shardBits, ok := disk.ecShards[vid]; ok { - existingShards = shardBits.ShardIdCount() + if si, ok := disk.ecShards[vid]; ok { + existingShards = si.Count() } // Score: prefer disks with fewer total shards and fewer shards of this volume @@ -1333,11 +1344,11 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int, disk picked := make(map[erasure_coding.ShardId]*EcNode) var candidateEcNodes []*CandidateEcNode for _, ecNode := range ecNodes { - shardBits := findEcVolumeShards(ecNode, vid, diskType) - if shardBits.ShardIdCount() > 0 { + si := findEcVolumeShardsInfo(ecNode, vid, diskType) + if si.Count() > 0 { candidateEcNodes = append(candidateEcNodes, &CandidateEcNode{ ecNode: ecNode, - shardCount: shardBits.ShardIdCount(), + shardCount: si.Count(), }) } } @@ -1347,13 +1358,13 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int, disk for i := 0; i < n; i++ { selectedEcNodeIndex := -1 for i, candidateEcNode := range candidateEcNodes { - shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid, diskType) - if shardBits > 0 { + si := findEcVolumeShardsInfo(candidateEcNode.ecNode, vid, diskType) + if si.Count() > 0 { selectedEcNodeIndex = i - for _, shardId := range shardBits.ShardIds() { + for _, shardId := range si.Ids() { candidateEcNode.shardCount-- picked[shardId] = candidateEcNode.ecNode - candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)}, diskType) + candidateEcNode.ecNode.deleteEcVolumeShards(vid, []erasure_coding.ShardId{shardId}, diskType) break } break diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index a9e5c9113..8f04a7bf0 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -108,12 +108,12 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec } // find volume location - nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid, diskType) + nodeToEcShardsInfo := collectEcNodeShardsInfo(topoInfo, vid, diskType) - fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits) + fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcShardsInfo) // collect ec shards to the server with most space - targetNodeLocation, err := collectEcShards(commandEnv, nodeToEcIndexBits, collection, vid) + targetNodeLocation, err := collectEcShards(commandEnv, nodeToEcShardsInfo, collection, vid) if err != nil { return fmt.Errorf("collectEcShards for volume %d: %v", vid, err) } @@ -124,13 +124,13 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec // Special case: if the EC index has no live entries, decoding is a no-op. // Just purge EC shards and return success without generating/mounting an empty volume. if isEcDecodeEmptyVolumeErr(err) { - return unmountAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, nodeToEcIndexBits, vid) + return unmountAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, nodeToEcShardsInfo, vid) } return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err) } // delete the previous ec shards - err = mountVolumeAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcIndexBits, vid) + err = mountVolumeAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcShardsInfo, vid) if err != nil { return fmt.Errorf("delete ec shards for volume %d: %v", vid, err) } @@ -150,24 +150,24 @@ func isEcDecodeEmptyVolumeErr(err error) bool { return strings.Contains(st.Message(), erasure_coding.EcNoLiveEntriesSubstring) } -func unmountAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error { - return unmountAndDeleteEcShardsWithPrefix("unmountAndDeleteEcShards", grpcDialOption, collection, nodeToEcIndexBits, vid) +func unmountAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, vid needle.VolumeId) error { + return unmountAndDeleteEcShardsWithPrefix("unmountAndDeleteEcShards", grpcDialOption, collection, nodeToShardsInfo, vid) } -func unmountAndDeleteEcShardsWithPrefix(prefix string, grpcDialOption grpc.DialOption, collection string, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error { - ewg := NewErrorWaitGroup(len(nodeToEcIndexBits)) +func unmountAndDeleteEcShardsWithPrefix(prefix string, grpcDialOption grpc.DialOption, collection string, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, vid needle.VolumeId) error { + ewg := NewErrorWaitGroup(len(nodeToShardsInfo)) // unmount and delete ec shards in parallel (one goroutine per location) - for location, ecIndexBits := range nodeToEcIndexBits { - location, ecIndexBits := location, ecIndexBits // capture loop variables for goroutine + for location, si := range nodeToShardsInfo { + location, si := location, si // capture loop variables for goroutine ewg.Add(func() error { - fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) - if err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice()); err != nil { + fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, si.Ids()) + if err := unmountEcShards(grpcDialOption, vid, location, si.Ids()); err != nil { return fmt.Errorf("%s unmount ec volume %d on %s: %w", prefix, vid, location, err) } - fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) - if err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice()); err != nil { + fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, si.Ids()) + if err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, si.Ids()); err != nil { return fmt.Errorf("%s delete ec volume %d on %s: %w", prefix, vid, location, err) } return nil @@ -176,7 +176,7 @@ func unmountAndDeleteEcShardsWithPrefix(prefix string, grpcDialOption grpc.DialO return ewg.Wait() } -func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error { +func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, vid needle.VolumeId) error { // mount volume if err := operation.WithVolumeServerClient(false, targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { @@ -188,7 +188,7 @@ func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection str return fmt.Errorf("mountVolumeAndDeleteEcShards mount volume %d on %s: %v", vid, targetNodeLocation, err) } - return unmountAndDeleteEcShardsWithPrefix("mountVolumeAndDeleteEcShards", grpcDialOption, collection, nodeToEcIndexBits, vid) + return unmountAndDeleteEcShardsWithPrefix("mountVolumeAndDeleteEcShards", grpcDialOption, collection, nodeToShardsInfo, vid) } func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error { @@ -207,57 +207,57 @@ func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, c } -func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation pb.ServerAddress, err error) { +func collectEcShards(commandEnv *CommandEnv, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, collection string, vid needle.VolumeId) (targetNodeLocation pb.ServerAddress, err error) { maxShardCount := 0 - var existingEcIndexBits erasure_coding.ShardBits - for loc, ecIndexBits := range nodeToEcIndexBits { - toBeCopiedShardCount := ecIndexBits.MinusParityShards().ShardIdCount() + existingShardsInfo := erasure_coding.NewShardsInfo() + for loc, si := range nodeToShardsInfo { + toBeCopiedShardCount := si.MinusParityShards().Count() if toBeCopiedShardCount > maxShardCount { maxShardCount = toBeCopiedShardCount targetNodeLocation = loc - existingEcIndexBits = ecIndexBits + existingShardsInfo = si } } - fmt.Printf("collectEcShards: ec volume %d collect shards to %s from: %+v\n", vid, targetNodeLocation, nodeToEcIndexBits) + fmt.Printf("collectEcShards: ec volume %d collect shards to %s from: %+v\n", vid, targetNodeLocation, nodeToShardsInfo) - var copiedEcIndexBits erasure_coding.ShardBits - for loc, ecIndexBits := range nodeToEcIndexBits { + copiedShardsInfo := erasure_coding.NewShardsInfo() + for loc, si := range nodeToShardsInfo { if loc == targetNodeLocation { continue } - needToCopyEcIndexBits := ecIndexBits.Minus(existingEcIndexBits).MinusParityShards() - if needToCopyEcIndexBits.ShardIdCount() == 0 { + needToCopyShardsInfo := si.Minus(existingShardsInfo).MinusParityShards() + if needToCopyShardsInfo.Count() == 0 { continue } err = operation.WithVolumeServerClient(false, targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation) + fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyShardsInfo.Ids(), loc, targetNodeLocation) _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(vid), Collection: collection, - ShardIds: needToCopyEcIndexBits.ToUint32Slice(), + ShardIds: needToCopyShardsInfo.IdsUint32(), CopyEcxFile: false, CopyEcjFile: true, CopyVifFile: true, SourceDataNode: string(loc), }) if copyErr != nil { - return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation, copyErr) + return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyShardsInfo.Ids(), loc, targetNodeLocation, copyErr) } - fmt.Printf("mount %d.%v on %s\n", vid, needToCopyEcIndexBits.ShardIds(), targetNodeLocation) + fmt.Printf("mount %d.%v on %s\n", vid, needToCopyShardsInfo.Ids(), targetNodeLocation) _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ VolumeId: uint32(vid), Collection: collection, - ShardIds: needToCopyEcIndexBits.ToUint32Slice(), + ShardIds: needToCopyShardsInfo.IdsUint32(), }) if mountErr != nil { - return fmt.Errorf("mount %d.%v on %s : %v\n", vid, needToCopyEcIndexBits.ShardIds(), targetNodeLocation, mountErr) + return fmt.Errorf("mount %d.%v on %s : %v\n", vid, needToCopyShardsInfo.Ids(), targetNodeLocation, mountErr) } return nil @@ -267,14 +267,12 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddr break } - copiedEcIndexBits = copiedEcIndexBits.Plus(needToCopyEcIndexBits) - + copiedShardsInfo.Add(needToCopyShardsInfo) } - nodeToEcIndexBits[targetNodeLocation] = existingEcIndexBits.Plus(copiedEcIndexBits) + nodeToShardsInfo[targetNodeLocation] = existingShardsInfo.Plus(copiedShardsInfo) return targetNodeLocation, err - } func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocations []*master_pb.LookupVolumeResponse_VolumeIdLocation, err error) { @@ -314,18 +312,17 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern strin return } -func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId, diskType types.DiskType) map[pb.ServerAddress]erasure_coding.ShardBits { - - nodeToEcIndexBits := make(map[pb.ServerAddress]erasure_coding.ShardBits) +func collectEcNodeShardsInfo(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId, diskType types.DiskType) map[pb.ServerAddress]*erasure_coding.ShardsInfo { + res := make(map[pb.ServerAddress]*erasure_coding.ShardsInfo) eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { if diskInfo, found := dn.DiskInfos[string(diskType)]; found { for _, v := range diskInfo.EcShardInfos { if v.Id == uint32(vid) { - nodeToEcIndexBits[pb.NewServerAddressFromDataNode(dn)] = erasure_coding.ShardBits(v.EcIndexBits) + res[pb.NewServerAddressFromDataNode(dn)] = erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(v) } } } }) - return nodeToEcIndexBits + return res } diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index f3c9439da..519f01ff9 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -278,10 +278,7 @@ func doEcEncode(commandEnv *CommandEnv, writer io.Writer, volumeIdToCollection m } // mount all ec shards for the converted volume - shardIds := make([]uint32, erasure_coding.TotalShardsCount) - for i := range shardIds { - shardIds[i] = uint32(i) - } + shardIds := erasure_coding.AllShardIds() ewg.Reset() for _, vid := range volumeIds { diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index cfc895c7d..396a42b1f 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -149,8 +149,7 @@ func (erb *ecRebuilder) countLocalShards(node *EcNode, collection string, volume for _, diskInfo := range node.info.DiskInfos { for _, ecShardInfo := range diskInfo.EcShardInfos { if ecShardInfo.Collection == collection && needle.VolumeId(ecShardInfo.Id) == volumeId { - shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) - return len(shardBits.ShardIds()) + return erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(ecShardInfo) } } } @@ -266,7 +265,7 @@ func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.Vo fmt.Printf("rebuildOneEcVolume %s %d\n", collection, volumeId) // collect shard files to rebuilder local disk - var generatedShardIds []uint32 + var generatedShardIds []erasure_coding.ShardId copiedShardIds, _, err := erb.prepareDataToRecover(rebuilder, collection, volumeId, locations) if err != nil { return err @@ -306,7 +305,7 @@ func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.Vo return nil } -func (erb *ecRebuilder) generateMissingShards(collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress) (rebuiltShardIds []uint32, err error) { +func (erb *ecRebuilder) generateMissingShards(collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress) (rebuiltShardIds []erasure_coding.ShardId, err error) { err = operation.WithVolumeServerClient(false, sourceLocation, erb.commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, rebuildErr := volumeServerClient.VolumeEcShardsRebuild(context.Background(), &volume_server_pb.VolumeEcShardsRebuildRequest{ @@ -314,34 +313,35 @@ func (erb *ecRebuilder) generateMissingShards(collection string, volumeId needle Collection: collection, }) if rebuildErr == nil { - rebuiltShardIds = resp.RebuiltShardIds + rebuiltShardIds = erasure_coding.Uint32ToShardIds(resp.RebuiltShardIds) } return rebuildErr }) return } -func (erb *ecRebuilder) prepareDataToRecover(rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations) (copiedShardIds []uint32, localShardIds []uint32, err error) { +func (erb *ecRebuilder) prepareDataToRecover(rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations) (copiedShardIds []erasure_coding.ShardId, localShardIds []erasure_coding.ShardId, err error) { needEcxFile := true - var localShardBits erasure_coding.ShardBits + localShardsInfo := erasure_coding.NewShardsInfo() for _, diskInfo := range rebuilder.info.DiskInfos { for _, ecShardInfo := range diskInfo.EcShardInfos { if ecShardInfo.Collection == collection && needle.VolumeId(ecShardInfo.Id) == volumeId { needEcxFile = false - localShardBits = erasure_coding.ShardBits(ecShardInfo.EcIndexBits) + localShardsInfo = erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(ecShardInfo) } } } - for shardId, ecNodes := range locations { + for i, ecNodes := range locations { + shardId := erasure_coding.ShardId(i) if len(ecNodes) == 0 { erb.write("missing shard %d.%d\n", volumeId, shardId) continue } - if localShardBits.HasShardId(erasure_coding.ShardId(shardId)) { - localShardIds = append(localShardIds, uint32(shardId)) + if localShardsInfo.Has(shardId) { + localShardIds = append(localShardIds, shardId) erb.write("use existing shard %d.%d\n", volumeId, shardId) continue } @@ -368,7 +368,7 @@ func (erb *ecRebuilder) prepareDataToRecover(rebuilder *EcNode, collection strin erb.write("%s failed to copy %d.%d from %s: %v\n", rebuilder.info.Id, volumeId, shardId, ecNodes[0].info.Id, copyErr) } else { erb.write("%s copied %d.%d from %s\n", rebuilder.info.Id, volumeId, shardId, ecNodes[0].info.Id) - copiedShardIds = append(copiedShardIds, uint32(shardId)) + copiedShardIds = append(copiedShardIds, shardId) } } @@ -394,7 +394,7 @@ func (ecShardMap EcShardMap) registerEcNode(ecNode *EcNode, collection string) { existing = make([][]*EcNode, erasure_coding.MaxShardCount) ecShardMap[needle.VolumeId(shardInfo.Id)] = existing } - for _, shardId := range erasure_coding.ShardBits(shardInfo.EcIndexBits).ShardIds() { + for _, shardId := range erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(shardInfo).Ids() { existing[shardId] = append(existing[shardId], ecNode) } } diff --git a/weed/shell/command_ec_rebuild_test.go b/weed/shell/command_ec_rebuild_test.go index b732e210a..4b076564b 100644 --- a/weed/shell/command_ec_rebuild_test.go +++ b/weed/shell/command_ec_rebuild_test.go @@ -15,9 +15,9 @@ func TestEcShardMapRegister(t *testing.T) { // Create test nodes with EC shards node1 := newEcNode("dc1", "rack1", "node1", 100). - addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}) + addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6}) node2 := newEcNode("dc1", "rack1", "node2", 100). - addEcVolumeAndShardsForTest(1, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}) + addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{7, 8, 9, 10, 11, 12, 13}) ecShardMap.registerEcNode(node1, "c1") ecShardMap.registerEcNode(node2, "c1") @@ -51,15 +51,15 @@ func TestEcShardMapRegister(t *testing.T) { func TestEcShardMapShardCount(t *testing.T) { testCases := []struct { name string - shardIds []uint32 + shardIds []erasure_coding.ShardId expectedCount int }{ - {"all shards", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}, 14}, - {"data shards only", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, 10}, - {"parity shards only", []uint32{10, 11, 12, 13}, 4}, - {"missing some shards", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8}, 9}, - {"single shard", []uint32{0}, 1}, - {"no shards", []uint32{}, 0}, + {"all shards", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}, 14}, + {"data shards only", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, 10}, + {"parity shards only", []erasure_coding.ShardId{10, 11, 12, 13}, 4}, + {"missing some shards", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6, 7, 8}, 9}, + {"single shard", []erasure_coding.ShardId{0}, 1}, + {"no shards", []erasure_coding.ShardId{}, 0}, } for _, tc := range testCases { @@ -85,7 +85,7 @@ func TestRebuildEcVolumesInsufficientShards(t *testing.T) { // Create a volume with insufficient shards (less than DataShardsCount) node1 := newEcNode("dc1", "rack1", "node1", 100). - addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4}) // Only 5 shards + addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4}) // Only 5 shards erb := &ecRebuilder{ commandEnv: &CommandEnv{ @@ -114,7 +114,7 @@ func TestRebuildEcVolumesCompleteVolume(t *testing.T) { // Create a volume with all shards node1 := newEcNode("dc1", "rack1", "node1", 100). - addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}) + addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}) erb := &ecRebuilder{ commandEnv: &CommandEnv{ @@ -146,7 +146,7 @@ func TestRebuildEcVolumesInsufficientSpace(t *testing.T) { // Node has 10 local shards, missing 4 shards (10,11,12,13), so needs 4 free slots // Set free slots to 3 (insufficient) node1 := newEcNode("dc1", "rack1", "node1", 3). // Only 3 free slots, need 4 - addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) + addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) erb := &ecRebuilder{ commandEnv: &CommandEnv{ @@ -180,11 +180,11 @@ func TestMultipleNodesWithShards(t *testing.T) { // Create 3 nodes with different shards node1 := newEcNode("dc1", "rack1", "node1", 100). - addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3}) + addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3}) node2 := newEcNode("dc1", "rack1", "node2", 100). - addEcVolumeAndShardsForTest(1, "c1", []uint32{4, 5, 6, 7}) + addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{4, 5, 6, 7}) node3 := newEcNode("dc1", "rack1", "node3", 100). - addEcVolumeAndShardsForTest(1, "c1", []uint32{8, 9}) + addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{8, 9}) ecShardMap.registerEcNode(node1, "c1") ecShardMap.registerEcNode(node2, "c1") @@ -222,9 +222,9 @@ func TestDuplicateShards(t *testing.T) { // Create 2 nodes with overlapping shards (both have shard 0) node1 := newEcNode("dc1", "rack1", "node1", 100). - addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3}) + addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3}) node2 := newEcNode("dc1", "rack1", "node2", 100). - addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 4, 5, 6}) // Duplicate shard 0 + addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 4, 5, 6}) // Duplicate shard 0 ecShardMap.registerEcNode(node1, "c1") ecShardMap.registerEcNode(node2, "c1") diff --git a/weed/shell/command_ec_test.go b/weed/shell/command_ec_test.go index 073c80ad4..1bf3924ff 100644 --- a/weed/shell/command_ec_test.go +++ b/weed/shell/command_ec_test.go @@ -12,8 +12,8 @@ import ( func TestCommandEcBalanceSmall(t *testing.T) { ecb := &ecBalancer{ ecNodes: []*EcNode{ - newEcNode("dc1", "rack1", "dn1", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}), - newEcNode("dc1", "rack2", "dn2", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}), + newEcNode("dc1", "rack1", "dn1", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}), + newEcNode("dc1", "rack2", "dn2", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}), }, applyBalancing: false, diskType: types.HardDriveType, @@ -26,11 +26,11 @@ func TestCommandEcBalanceNothingToMove(t *testing.T) { ecb := &ecBalancer{ ecNodes: []*EcNode{ newEcNode("dc1", "rack1", "dn1", 100). - addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}). - addEcVolumeAndShardsForTest(2, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}), + addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6}). + addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{7, 8, 9, 10, 11, 12, 13}), newEcNode("dc1", "rack1", "dn2", 100). - addEcVolumeAndShardsForTest(1, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}). - addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}), + addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{7, 8, 9, 10, 11, 12, 13}). + addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6}), }, applyBalancing: false, diskType: types.HardDriveType, @@ -43,11 +43,11 @@ func TestCommandEcBalanceAddNewServers(t *testing.T) { ecb := &ecBalancer{ ecNodes: []*EcNode{ newEcNode("dc1", "rack1", "dn1", 100). - addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}). - addEcVolumeAndShardsForTest(2, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}), + addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6}). + addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{7, 8, 9, 10, 11, 12, 13}), newEcNode("dc1", "rack1", "dn2", 100). - addEcVolumeAndShardsForTest(1, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}). - addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}), + addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{7, 8, 9, 10, 11, 12, 13}). + addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6}), newEcNode("dc1", "rack1", "dn3", 100), newEcNode("dc1", "rack1", "dn4", 100), }, @@ -62,11 +62,11 @@ func TestCommandEcBalanceAddNewRacks(t *testing.T) { ecb := &ecBalancer{ ecNodes: []*EcNode{ newEcNode("dc1", "rack1", "dn1", 100). - addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}). - addEcVolumeAndShardsForTest(2, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}), + addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6}). + addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{7, 8, 9, 10, 11, 12, 13}), newEcNode("dc1", "rack1", "dn2", 100). - addEcVolumeAndShardsForTest(1, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}). - addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}), + addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{7, 8, 9, 10, 11, 12, 13}). + addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6}), newEcNode("dc1", "rack2", "dn3", 100), newEcNode("dc1", "rack2", "dn4", 100), }, @@ -81,36 +81,36 @@ func TestCommandEcBalanceVolumeEvenButRackUneven(t *testing.T) { ecb := ecBalancer{ ecNodes: []*EcNode{ newEcNode("dc1", "rack1", "dn_shared", 100). - addEcVolumeAndShardsForTest(1, "c1", []uint32{0}). - addEcVolumeAndShardsForTest(2, "c1", []uint32{0}), - - newEcNode("dc1", "rack1", "dn_a1", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{1}), - newEcNode("dc1", "rack1", "dn_a2", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{2}), - newEcNode("dc1", "rack1", "dn_a3", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{3}), - newEcNode("dc1", "rack1", "dn_a4", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{4}), - newEcNode("dc1", "rack1", "dn_a5", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{5}), - newEcNode("dc1", "rack1", "dn_a6", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{6}), - newEcNode("dc1", "rack1", "dn_a7", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{7}), - newEcNode("dc1", "rack1", "dn_a8", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{8}), - newEcNode("dc1", "rack1", "dn_a9", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{9}), - newEcNode("dc1", "rack1", "dn_a10", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{10}), - newEcNode("dc1", "rack1", "dn_a11", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{11}), - newEcNode("dc1", "rack1", "dn_a12", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{12}), - newEcNode("dc1", "rack1", "dn_a13", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{13}), - - newEcNode("dc1", "rack1", "dn_b1", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{1}), - newEcNode("dc1", "rack1", "dn_b2", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{2}), - newEcNode("dc1", "rack1", "dn_b3", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{3}), - newEcNode("dc1", "rack1", "dn_b4", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{4}), - newEcNode("dc1", "rack1", "dn_b5", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{5}), - newEcNode("dc1", "rack1", "dn_b6", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{6}), - newEcNode("dc1", "rack1", "dn_b7", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{7}), - newEcNode("dc1", "rack1", "dn_b8", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{8}), - newEcNode("dc1", "rack1", "dn_b9", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{9}), - newEcNode("dc1", "rack1", "dn_b10", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{10}), - newEcNode("dc1", "rack1", "dn_b11", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{11}), - newEcNode("dc1", "rack1", "dn_b12", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{12}), - newEcNode("dc1", "rack1", "dn_b13", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{13}), + addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0}). + addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{0}), + + newEcNode("dc1", "rack1", "dn_a1", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{1}), + newEcNode("dc1", "rack1", "dn_a2", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{2}), + newEcNode("dc1", "rack1", "dn_a3", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{3}), + newEcNode("dc1", "rack1", "dn_a4", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{4}), + newEcNode("dc1", "rack1", "dn_a5", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{5}), + newEcNode("dc1", "rack1", "dn_a6", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{6}), + newEcNode("dc1", "rack1", "dn_a7", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{7}), + newEcNode("dc1", "rack1", "dn_a8", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{8}), + newEcNode("dc1", "rack1", "dn_a9", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{9}), + newEcNode("dc1", "rack1", "dn_a10", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{10}), + newEcNode("dc1", "rack1", "dn_a11", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{11}), + newEcNode("dc1", "rack1", "dn_a12", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{12}), + newEcNode("dc1", "rack1", "dn_a13", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{13}), + + newEcNode("dc1", "rack1", "dn_b1", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{1}), + newEcNode("dc1", "rack1", "dn_b2", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{2}), + newEcNode("dc1", "rack1", "dn_b3", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{3}), + newEcNode("dc1", "rack1", "dn_b4", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{4}), + newEcNode("dc1", "rack1", "dn_b5", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{5}), + newEcNode("dc1", "rack1", "dn_b6", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{6}), + newEcNode("dc1", "rack1", "dn_b7", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{7}), + newEcNode("dc1", "rack1", "dn_b8", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{8}), + newEcNode("dc1", "rack1", "dn_b9", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{9}), + newEcNode("dc1", "rack1", "dn_b10", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{10}), + newEcNode("dc1", "rack1", "dn_b11", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{11}), + newEcNode("dc1", "rack1", "dn_b12", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{12}), + newEcNode("dc1", "rack1", "dn_b13", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{13}), newEcNode("dc1", "rack1", "dn3", 100), }, @@ -134,7 +134,7 @@ func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNod } } -func (ecNode *EcNode) addEcVolumeAndShardsForTest(vid uint32, collection string, shardIds []uint32) *EcNode { +func (ecNode *EcNode) addEcVolumeAndShardsForTest(vid uint32, collection string, shardIds []erasure_coding.ShardId) *EcNode { return ecNode.addEcVolumeShards(needle.VolumeId(vid), collection, shardIds, types.HardDriveType) } @@ -146,7 +146,7 @@ func TestCommandEcBalanceEvenDataAndParityDistribution(t *testing.T) { ecb := &ecBalancer{ ecNodes: []*EcNode{ // All shards initially on rack1/dn1 - newEcNode("dc1", "rack1", "dn1", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}), + newEcNode("dc1", "rack1", "dn1", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}), // Empty nodes on other racks newEcNode("dc1", "rack2", "dn2", 100), newEcNode("dc1", "rack3", "dn3", 100), @@ -163,7 +163,7 @@ func TestCommandEcBalanceEvenDataAndParityDistribution(t *testing.T) { // After balancing (dry-run), verify the PLANNED distribution by checking what moves were proposed // The ecb.ecNodes state is updated during dry-run to track planned moves vid := needle.VolumeId(1) - dataShardCount := erasure_coding.DataShardsCount // 10 + dataShardCount := erasure_coding.DataShardsCount // 10 parityShardCount := erasure_coding.ParityShardsCount // 4 // Count data and parity shards per rack based on current (updated) state @@ -172,7 +172,7 @@ func TestCommandEcBalanceEvenDataAndParityDistribution(t *testing.T) { // With 6 racks: // - Data shards (10): max 2 per rack (ceil(10/6) = 2) // - Parity shards (4): max 1 per rack (ceil(4/6) = 1) - maxDataPerRack := ceilDivide(dataShardCount, 6) // 2 + maxDataPerRack := ceilDivide(dataShardCount, 6) // 2 maxParityPerRack := ceilDivide(parityShardCount, 6) // 1 // Verify no rack has more than max data shards @@ -229,8 +229,8 @@ func countDataAndParityShardsPerRack(ecNodes []*EcNode, vid needle.VolumeId, dat parityPerRack = make(map[string]int) for _, ecNode := range ecNodes { - shardBits := findEcVolumeShards(ecNode, vid, types.HardDriveType) - for _, shardId := range shardBits.ShardIds() { + si := findEcVolumeShardsInfo(ecNode, vid, types.HardDriveType) + for _, shardId := range si.Ids() { rackId := string(ecNode.rack) if int(shardId) < dataShardCount { dataPerRack[rackId]++ @@ -249,9 +249,9 @@ func TestCommandEcBalanceMultipleVolumesEvenDistribution(t *testing.T) { ecb := &ecBalancer{ ecNodes: []*EcNode{ // Volume 1: all shards on rack1 - newEcNode("dc1", "rack1", "dn1", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}), + newEcNode("dc1", "rack1", "dn1", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}), // Volume 2: all shards on rack2 - newEcNode("dc1", "rack2", "dn2", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}), + newEcNode("dc1", "rack2", "dn2", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}), // Empty nodes on other racks newEcNode("dc1", "rack3", "dn3", 100), newEcNode("dc1", "rack4", "dn4", 100), diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 88058807f..f86d5ff94 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -255,7 +255,7 @@ func capacityByMaxVolumeCount(diskType types.DiskType) CapacityFunc { } var ecShardCount int for _, ecShardInfo := range diskInfo.EcShardInfos { - ecShardCount += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount() + ecShardCount += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(ecShardInfo) } return float64(diskInfo.MaxVolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount } @@ -269,7 +269,7 @@ func capacityByFreeVolumeCount(diskType types.DiskType) CapacityFunc { } var ecShardCount int for _, ecShardInfo := range diskInfo.EcShardInfos { - ecShardCount += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount() + ecShardCount += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(ecShardInfo) } return float64(diskInfo.MaxVolumeCount-diskInfo.VolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount } diff --git a/weed/shell/command_volume_list.go b/weed/shell/command_volume_list.go index f57c7f5be..5904d6f77 100644 --- a/weed/shell/command_volume_list.go +++ b/weed/shell/command_volume_list.go @@ -251,25 +251,22 @@ func (c *commandVolumeList) writeDiskInfo(writer io.Writer, t *master_pb.DiskInf } // Build shard size information - shardIds := erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() + si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(ecShardInfo) var totalSize int64 var shardSizeInfo string if len(ecShardInfo.ShardSizes) > 0 { var shardDetails []string - for _, shardId := range shardIds { - if size, found := erasure_coding.GetShardSize(ecShardInfo, erasure_coding.ShardId(shardId)); found { - shardDetails = append(shardDetails, fmt.Sprintf("%d:%s", shardId, util.BytesToHumanReadable(uint64(size)))) - totalSize += size - } else { - shardDetails = append(shardDetails, fmt.Sprintf("%d:?", shardId)) - } + for _, shardId := range si.Ids() { + size := int64(si.Size(shardId)) + shardDetails = append(shardDetails, fmt.Sprintf("%d:%s", shardId, util.BytesToHumanReadable(uint64(size)))) + totalSize += size } shardSizeInfo = fmt.Sprintf(" sizes:[%s] total:%s", strings.Join(shardDetails, " "), util.BytesToHumanReadable(uint64(totalSize))) } output(verbosityLevel >= 5, writer, " ec volume id:%v collection:%v shards:%v%s %s\n", - ecShardInfo.Id, ecShardInfo.Collection, shardIds, shardSizeInfo, expireAtString) + ecShardInfo.Id, ecShardInfo.Collection, si.Ids(), shardSizeInfo, expireAtString) } output((volumeInfosFound || ecShardInfoFound) && verbosityLevel >= 4, writer, " Disk %s %+v \n", diskType, s) return s diff --git a/weed/shell/command_volume_list_test.go b/weed/shell/command_volume_list_test.go index 8b85efc95..a0d272d80 100644 --- a/weed/shell/command_volume_list_test.go +++ b/weed/shell/command_volume_list_test.go @@ -108,15 +108,17 @@ func parseOutput(output string) *master_pb.TopologyInfo { if strings.HasPrefix(part, "collection:") { ecShard.Collection = part[len("collection:"):] } + // TODO: we need to parse EC shard sizes as well if strings.HasPrefix(part, "shards:") { shards := part[len("shards:["):] shards = strings.TrimRight(shards, "]") - shardBits := erasure_coding.ShardBits(0) + shardsInfo := erasure_coding.NewShardsInfo() for _, shardId := range strings.Split(shards, ",") { sid, _ := strconv.Atoi(shardId) - shardBits = shardBits.AddShardId(erasure_coding.ShardId(sid)) + shardsInfo.Set(erasure_coding.ShardId(sid), 0) } - ecShard.EcIndexBits = uint32(shardBits) + ecShard.EcIndexBits = shardsInfo.Bitmap() + ecShard.ShardSizes = shardsInfo.SizesInt64() } } disk.EcShardInfos = append(disk.EcShardInfos, ecShard) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index a13e8e671..04e982f93 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -197,8 +197,8 @@ func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, } func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool, diskType types.DiskType, writer io.Writer) (hasMoved bool, err error) { - - for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() { + si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(ecShardInfo) + for _, shardId := range si.Ids() { // Sort by: 1) fewest shards of this volume, 2) most free EC slots // This ensures we prefer nodes with capacity and balanced shard distribution slices.SortFunc(otherNodes, func(a, b *EcNode) int { diff --git a/weed/shell/ec_proportional_rebalance.go b/weed/shell/ec_proportional_rebalance.go index d01803001..52adf4297 100644 --- a/weed/shell/ec_proportional_rebalance.go +++ b/weed/shell/ec_proportional_rebalance.go @@ -43,7 +43,7 @@ func NewTopologyDistributionAnalysis() *TopologyDistributionAnalysis { } // AddNode adds a node and its shards to the analysis -func (a *TopologyDistributionAnalysis) AddNode(node *EcNode, shardBits erasure_coding.ShardBits) { +func (a *TopologyDistributionAnalysis) AddNode(node *EcNode, shardsInfo *erasure_coding.ShardsInfo) { nodeId := node.info.Id // Create distribution.TopologyNode from EcNode @@ -52,18 +52,15 @@ func (a *TopologyDistributionAnalysis) AddNode(node *EcNode, shardBits erasure_c DataCenter: string(node.dc), Rack: string(node.rack), FreeSlots: node.freeEcSlot, - TotalShards: shardBits.ShardIdCount(), - } - - for _, shardId := range shardBits.ShardIds() { - topoNode.ShardIDs = append(topoNode.ShardIDs, int(shardId)) + TotalShards: shardsInfo.Count(), + ShardIDs: shardsInfo.IdsInt(), } a.inner.AddNode(topoNode) a.nodeMap[nodeId] = node // Add shard locations - for _, shardId := range shardBits.ShardIds() { + for _, shardId := range shardsInfo.Ids() { a.inner.AddShardLocation(distribution.ShardLocation{ ShardID: int(shardId), NodeID: nodeId, @@ -120,9 +117,9 @@ func AnalyzeVolumeDistribution(volumeId needle.VolumeId, locations []*EcNode, di analysis := NewTopologyDistributionAnalysis() for _, node := range locations { - shardBits := findEcVolumeShards(node, volumeId, diskType) - if shardBits.ShardIdCount() > 0 { - analysis.AddNode(node, shardBits) + si := findEcVolumeShardsInfo(node, volumeId, diskType) + if si.Count() > 0 { + analysis.AddNode(node, si) } } @@ -207,8 +204,8 @@ func (r *ProportionalECRebalancer) PlanMoves( // Add shard locations from nodes that have shards for _, node := range locations { nodeId := node.info.Id - shardBits := findEcVolumeShards(node, volumeId, r.diskType) - for _, shardId := range shardBits.ShardIds() { + si := findEcVolumeShardsInfo(node, volumeId, r.diskType) + for _, shardId := range si.Ids() { analysis.AddShardLocation(distribution.ShardLocation{ ShardID: int(shardId), NodeID: nodeId, diff --git a/weed/shell/ec_rebalance_slots_test.go b/weed/shell/ec_rebalance_slots_test.go index 4a55c9bce..093df1ae3 100644 --- a/weed/shell/ec_rebalance_slots_test.go +++ b/weed/shell/ec_rebalance_slots_test.go @@ -44,7 +44,7 @@ func TestECRebalanceWithLimitedSlots(t *testing.T) { shardCount := 0 for _, diskInfo := range node.info.DiskInfos { for _, ecShard := range diskInfo.EcShardInfos { - shardCount += erasure_coding.ShardBits(ecShard.EcIndexBits).ShardIdCount() + shardCount += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(ecShard) } } t.Logf(" Node %s (rack %s): %d shards, %d free slots", @@ -56,7 +56,7 @@ func TestECRebalanceWithLimitedSlots(t *testing.T) { for _, node := range ecNodes { for _, diskInfo := range node.info.DiskInfos { for _, ecShard := range diskInfo.EcShardInfos { - totalEcShards += erasure_coding.ShardBits(ecShard.EcIndexBits).ShardIdCount() + totalEcShards += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(ecShard) } } } @@ -122,7 +122,7 @@ func TestECRebalanceZeroFreeSlots(t *testing.T) { shardCount := 0 for _, diskInfo := range node.info.DiskInfos { for _, ecShard := range diskInfo.EcShardInfos { - shardCount += erasure_coding.ShardBits(ecShard.EcIndexBits).ShardIdCount() + shardCount += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(ecShard) } } t.Logf(" Node %s: %d shards, %d free slots, volumeCount=%d, max=%d", @@ -226,14 +226,15 @@ func buildZeroFreeSlotTopology() *master_pb.TopologyInfo { func buildEcShards(volumeIds []uint32) []*master_pb.VolumeEcShardInformationMessage { var shards []*master_pb.VolumeEcShardInformationMessage for _, vid := range volumeIds { - allShardBits := erasure_coding.ShardBits(0) - for i := 0; i < erasure_coding.TotalShardsCount; i++ { - allShardBits = allShardBits.AddShardId(erasure_coding.ShardId(i)) + si := erasure_coding.NewShardsInfo() + for _, id := range erasure_coding.AllShardIds() { + si.Set(id, 1234) } shards = append(shards, &master_pb.VolumeEcShardInformationMessage{ Id: vid, Collection: "ectest", - EcIndexBits: uint32(allShardBits), + EcIndexBits: si.Bitmap(), + ShardSizes: si.SizesInt64(), }) } return shards diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go index e55a9f676..010277be1 100644 --- a/weed/storage/erasure_coding/ec_shard.go +++ b/weed/storage/erasure_coding/ec_shard.go @@ -15,6 +15,33 @@ import ( type ShardId uint8 +// Converts a slice of uint32s to ShardId. +func Uint32ToShardIds(ids []uint32) []ShardId { + res := make([]ShardId, len(ids)) + for i, id := range ids { + res[i] = ShardId(id) + } + return res +} + +// Converts a slice of ShardIds to uint32 +func ShardIdsToUint32(ids []ShardId) []uint32 { + res := make([]uint32, len(ids)) + for i, id := range ids { + res[i] = uint32(id) + } + return res +} + +// Returns a slice of all possible ShardIds. +func AllShardIds() []ShardId { + res := make([]ShardId, TotalShardsCount) + for i := range res { + res[i] = ShardId(i) + } + return res +} + type EcVolumeShard struct { VolumeId needle.VolumeId ShardId ShardId diff --git a/weed/storage/erasure_coding/ec_shard_size_helper.go b/weed/storage/erasure_coding/ec_shard_size_helper.go deleted file mode 100644 index 43d9a4f2f..000000000 --- a/weed/storage/erasure_coding/ec_shard_size_helper.go +++ /dev/null @@ -1,68 +0,0 @@ -package erasure_coding - -import ( - "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" -) - -// GetShardSize returns the size of a specific shard from VolumeEcShardInformationMessage -// Returns the size and true if the shard exists, 0 and false if not present -func GetShardSize(msg *master_pb.VolumeEcShardInformationMessage, shardId ShardId) (size int64, found bool) { - if msg == nil || msg.ShardSizes == nil { - return 0, false - } - - shardBits := ShardBits(msg.EcIndexBits) - index, found := shardBits.ShardIdToIndex(shardId) - if !found || index >= len(msg.ShardSizes) { - return 0, false - } - - return msg.ShardSizes[index], true -} - -// SetShardSize sets the size of a specific shard in VolumeEcShardInformationMessage -// Returns true if successful, false if the shard is not present in EcIndexBits -func SetShardSize(msg *master_pb.VolumeEcShardInformationMessage, shardId ShardId, size int64) bool { - if msg == nil { - return false - } - - shardBits := ShardBits(msg.EcIndexBits) - index, found := shardBits.ShardIdToIndex(shardId) - if !found { - return false - } - - // Initialize ShardSizes slice if needed - expectedLength := shardBits.ShardIdCount() - if msg.ShardSizes == nil { - msg.ShardSizes = make([]int64, expectedLength) - } else if len(msg.ShardSizes) != expectedLength { - // Resize the slice to match the expected length - newSizes := make([]int64, expectedLength) - copy(newSizes, msg.ShardSizes) - msg.ShardSizes = newSizes - } - - if index >= len(msg.ShardSizes) { - return false - } - - msg.ShardSizes[index] = size - return true -} - -// InitializeShardSizes initializes the ShardSizes slice based on EcIndexBits -// This ensures the slice has the correct length for all present shards -func InitializeShardSizes(msg *master_pb.VolumeEcShardInformationMessage) { - if msg == nil { - return - } - - shardBits := ShardBits(msg.EcIndexBits) - expectedLength := shardBits.ShardIdCount() - - if msg.ShardSizes == nil || len(msg.ShardSizes) != expectedLength { - msg.ShardSizes = make([]int64, expectedLength) - } -} diff --git a/weed/storage/erasure_coding/ec_shard_size_helper_test.go b/weed/storage/erasure_coding/ec_shard_size_helper_test.go deleted file mode 100644 index 2ef54c949..000000000 --- a/weed/storage/erasure_coding/ec_shard_size_helper_test.go +++ /dev/null @@ -1,117 +0,0 @@ -package erasure_coding - -import ( - "testing" - - "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" -) - -func TestShardSizeHelpers(t *testing.T) { - // Create a message with shards 0, 2, and 5 present (EcIndexBits = 0b100101 = 37) - msg := &master_pb.VolumeEcShardInformationMessage{ - Id: 123, - EcIndexBits: 37, // Binary: 100101, shards 0, 2, 5 are present - } - - // Test SetShardSize - if !SetShardSize(msg, 0, 1000) { - t.Error("Failed to set size for shard 0") - } - if !SetShardSize(msg, 2, 2000) { - t.Error("Failed to set size for shard 2") - } - if !SetShardSize(msg, 5, 5000) { - t.Error("Failed to set size for shard 5") - } - - // Test setting size for non-present shard should fail - if SetShardSize(msg, 1, 1500) { - t.Error("Should not be able to set size for non-present shard 1") - } - - // Verify ShardSizes slice has correct length (3 shards) - if len(msg.ShardSizes) != 3 { - t.Errorf("Expected ShardSizes length 3, got %d", len(msg.ShardSizes)) - } - - // Test GetShardSize - if size, found := GetShardSize(msg, 0); !found || size != 1000 { - t.Errorf("Expected shard 0 size 1000, got %d (found: %v)", size, found) - } - if size, found := GetShardSize(msg, 2); !found || size != 2000 { - t.Errorf("Expected shard 2 size 2000, got %d (found: %v)", size, found) - } - if size, found := GetShardSize(msg, 5); !found || size != 5000 { - t.Errorf("Expected shard 5 size 5000, got %d (found: %v)", size, found) - } - - // Test getting size for non-present shard - if size, found := GetShardSize(msg, 1); found { - t.Errorf("Should not find shard 1, but got size %d", size) - } - - // Test direct slice access - if len(msg.ShardSizes) != 3 { - t.Errorf("Expected 3 shard sizes in slice, got %d", len(msg.ShardSizes)) - } - - expectedSizes := []int64{1000, 2000, 5000} // Ordered by shard ID: 0, 2, 5 - for i, expectedSize := range expectedSizes { - if i < len(msg.ShardSizes) && msg.ShardSizes[i] != expectedSize { - t.Errorf("Expected ShardSizes[%d] = %d, got %d", i, expectedSize, msg.ShardSizes[i]) - } - } -} - -func TestShardBitsHelpers(t *testing.T) { - // Test with EcIndexBits = 37 (binary: 100101, shards 0, 2, 5) - shardBits := ShardBits(37) - - // Test ShardIdToIndex - if index, found := shardBits.ShardIdToIndex(0); !found || index != 0 { - t.Errorf("Expected shard 0 at index 0, got %d (found: %v)", index, found) - } - if index, found := shardBits.ShardIdToIndex(2); !found || index != 1 { - t.Errorf("Expected shard 2 at index 1, got %d (found: %v)", index, found) - } - if index, found := shardBits.ShardIdToIndex(5); !found || index != 2 { - t.Errorf("Expected shard 5 at index 2, got %d (found: %v)", index, found) - } - - // Test for non-present shard - if index, found := shardBits.ShardIdToIndex(1); found { - t.Errorf("Should not find shard 1, but got index %d", index) - } - - // Test IndexToShardId - if shardId, found := shardBits.IndexToShardId(0); !found || shardId != 0 { - t.Errorf("Expected index 0 to be shard 0, got %d (found: %v)", shardId, found) - } - if shardId, found := shardBits.IndexToShardId(1); !found || shardId != 2 { - t.Errorf("Expected index 1 to be shard 2, got %d (found: %v)", shardId, found) - } - if shardId, found := shardBits.IndexToShardId(2); !found || shardId != 5 { - t.Errorf("Expected index 2 to be shard 5, got %d (found: %v)", shardId, found) - } - - // Test for invalid index - if shardId, found := shardBits.IndexToShardId(3); found { - t.Errorf("Should not find shard for index 3, but got shard %d", shardId) - } - - // Test EachSetIndex - var collectedShards []ShardId - shardBits.EachSetIndex(func(shardId ShardId) { - collectedShards = append(collectedShards, shardId) - }) - expectedShards := []ShardId{0, 2, 5} - if len(collectedShards) != len(expectedShards) { - t.Errorf("Expected EachSetIndex to collect %v, got %v", expectedShards, collectedShards) - } - for i, expected := range expectedShards { - if i >= len(collectedShards) || collectedShards[i] != expected { - t.Errorf("Expected EachSetIndex to collect %v, got %v", expectedShards, collectedShards) - break - } - } -} diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index 72aeb5e40..8f2353472 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -3,7 +3,6 @@ package erasure_coding import ( "errors" "fmt" - "math" "os" "slices" "sync" @@ -185,7 +184,6 @@ func (ev *EcVolume) Sync() { } func (ev *EcVolume) Destroy() { - ev.Close() for _, s := range ev.Shards { @@ -240,29 +238,12 @@ func (ev *EcVolume) ShardIdList() (shardIds []ShardId) { return } -type ShardInfo struct { - ShardId ShardId - Size uint64 -} - -func (ev *EcVolume) ShardDetails() (shards []ShardInfo) { - for _, s := range ev.Shards { - shardSize := s.Size() - if shardSize >= 0 { - shards = append(shards, ShardInfo{ - ShardId: s.ShardId, - Size: uint64(shardSize), - }) - } - } - return -} - func (ev *EcVolume) ToVolumeEcShardInformationMessage(diskId uint32) (messages []*master_pb.VolumeEcShardInformationMessage) { - prevVolumeId := needle.VolumeId(math.MaxUint32) - var m *master_pb.VolumeEcShardInformationMessage + ecInfoPerVolume := map[needle.VolumeId]*master_pb.VolumeEcShardInformationMessage{} + for _, s := range ev.Shards { - if s.VolumeId != prevVolumeId { + m, ok := ecInfoPerVolume[s.VolumeId] + if !ok { m = &master_pb.VolumeEcShardInformationMessage{ Id: uint32(s.VolumeId), Collection: s.Collection, @@ -270,13 +251,18 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage(diskId uint32) (messages [ ExpireAtSec: ev.ExpireAtSec, DiskId: diskId, } - messages = append(messages, m) + ecInfoPerVolume[s.VolumeId] = m } - prevVolumeId = s.VolumeId - m.EcIndexBits = uint32(ShardBits(m.EcIndexBits).AddShardId(s.ShardId)) - // Add shard size information using the optimized format - SetShardSize(m, s.ShardId, s.Size()) + // Update EC shard bits and sizes. + si := ShardsInfoFromVolumeEcShardInformationMessage(m) + si.Set(s.ShardId, ShardSize(s.Size())) + m.EcIndexBits = uint32(si.Bitmap()) + m.ShardSizes = si.SizesInt64() + } + + for _, m := range ecInfoPerVolume { + messages = append(messages, m) } return } diff --git a/weed/storage/erasure_coding/ec_volume_info.go b/weed/storage/erasure_coding/ec_volume_info.go index 4d34ccbde..ca98bb658 100644 --- a/weed/storage/erasure_coding/ec_volume_info.go +++ b/weed/storage/erasure_coding/ec_volume_info.go @@ -1,260 +1,294 @@ package erasure_coding import ( - "math/bits" + "fmt" + "sort" + "github.com/dustin/go-humanize" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage/needle" ) -// data structure used in master -type EcVolumeInfo struct { - VolumeId needle.VolumeId - Collection string - ShardBits ShardBits - DiskType string - DiskId uint32 // ID of the disk this EC volume is on - ExpireAtSec uint64 // ec volume destroy time, calculated from the ec volume was created - ShardSizes []int64 // optimized: sizes for shards in order of set bits in ShardBits +// ShardsInfo encapsulates information for EC shards +type ShardSize int64 +type ShardInfo struct { + Id ShardId + Size ShardSize +} +type ShardsInfo struct { + shards map[ShardId]*ShardInfo } -func (ecInfo *EcVolumeInfo) AddShardId(id ShardId) { - oldBits := ecInfo.ShardBits - ecInfo.ShardBits = ecInfo.ShardBits.AddShardId(id) +func NewShardsInfo() *ShardsInfo { + return &ShardsInfo{ + shards: map[ShardId]*ShardInfo{}, + } +} - // If shard was actually added, resize ShardSizes array - if oldBits != ecInfo.ShardBits { - ecInfo.resizeShardSizes(oldBits) +// Initializes a ShardsInfo from a ECVolume. +func ShardsInfoFromVolume(ev *EcVolume) *ShardsInfo { + res := &ShardsInfo{ + shards: map[ShardId]*ShardInfo{}, + } + for _, s := range ev.Shards { + res.Set(s.ShardId, ShardSize(s.Size())) } + return res } -func (ecInfo *EcVolumeInfo) RemoveShardId(id ShardId) { - oldBits := ecInfo.ShardBits - ecInfo.ShardBits = ecInfo.ShardBits.RemoveShardId(id) +// Initializes a ShardsInfo from a VolumeEcShardInformationMessage proto. +func ShardsInfoFromVolumeEcShardInformationMessage(vi *master_pb.VolumeEcShardInformationMessage) *ShardsInfo { + res := NewShardsInfo() + if vi == nil { + return res + } - // If shard was actually removed, resize ShardSizes array - if oldBits != ecInfo.ShardBits { - ecInfo.resizeShardSizes(oldBits) + var id ShardId + var j int + for bitmap := vi.EcIndexBits; bitmap != 0; bitmap >>= 1 { + if bitmap&1 != 0 { + var size ShardSize + if j < len(vi.ShardSizes) { + size = ShardSize(vi.ShardSizes[j]) + } + j++ + res.shards[id] = &ShardInfo{ + Id: id, + Size: size, + } + } + id++ } + + return res } -func (ecInfo *EcVolumeInfo) SetShardSize(id ShardId, size int64) { - ecInfo.ensureShardSizesInitialized() - if index, found := ecInfo.ShardBits.ShardIdToIndex(id); found && index < len(ecInfo.ShardSizes) { - ecInfo.ShardSizes[index] = size +// Returns a count of shards from a VolumeEcShardInformationMessage proto. +func ShardsCountFromVolumeEcShardInformationMessage(vi *master_pb.VolumeEcShardInformationMessage) int { + if vi == nil { + return 0 } + + return ShardsInfoFromVolumeEcShardInformationMessage(vi).Count() } -func (ecInfo *EcVolumeInfo) GetShardSize(id ShardId) (int64, bool) { - if index, found := ecInfo.ShardBits.ShardIdToIndex(id); found && index < len(ecInfo.ShardSizes) { - return ecInfo.ShardSizes[index], true +// Returns a string representation for a ShardsInfo. +func (sp *ShardsInfo) String() string { + var res string + ids := sp.Ids() + for i, id := range sp.Ids() { + res += fmt.Sprintf("%d:%s", id, humanize.Bytes(uint64(sp.shards[id].Size))) + if i < len(ids)-1 { + res += " " + } } - return 0, false + return res } -func (ecInfo *EcVolumeInfo) GetTotalSize() int64 { - var total int64 - for _, size := range ecInfo.ShardSizes { - total += size +// AsSlice converts a ShardsInfo to a slice of ShardInfo structs, ordered by shard ID. +func (si *ShardsInfo) AsSlice() []*ShardInfo { + res := make([]*ShardInfo, len(si.shards)) + i := 0 + for _, id := range si.Ids() { + res[i] = si.shards[id] + i++ } - return total -} -func (ecInfo *EcVolumeInfo) HasShardId(id ShardId) bool { - return ecInfo.ShardBits.HasShardId(id) + return res } -func (ecInfo *EcVolumeInfo) ShardIds() (ret []ShardId) { - return ecInfo.ShardBits.ShardIds() +// Count returns the number of EC shards. +func (si *ShardsInfo) Count() int { + return len(si.shards) } -func (ecInfo *EcVolumeInfo) ShardIdCount() (count int) { - return ecInfo.ShardBits.ShardIdCount() +// Has verifies if a shard ID is present. +func (si *ShardsInfo) Has(id ShardId) bool { + _, ok := si.shards[id] + return ok } -func (ecInfo *EcVolumeInfo) Minus(other *EcVolumeInfo) *EcVolumeInfo { - ret := &EcVolumeInfo{ - VolumeId: ecInfo.VolumeId, - Collection: ecInfo.Collection, - ShardBits: ecInfo.ShardBits.Minus(other.ShardBits), - DiskType: ecInfo.DiskType, - DiskId: ecInfo.DiskId, - ExpireAtSec: ecInfo.ExpireAtSec, +// Ids returns a list of shard IDs, in ascending order. +func (si *ShardsInfo) Ids() []ShardId { + ids := []ShardId{} + for id := range si.shards { + ids = append(ids, id) } + sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] }) - // Initialize optimized ShardSizes for the result - ret.ensureShardSizesInitialized() - - // Copy shard sizes for remaining shards - retIndex := 0 - for shardId := ShardId(0); shardId < ShardId(MaxShardCount) && retIndex < len(ret.ShardSizes); shardId++ { - if ret.ShardBits.HasShardId(shardId) { - if size, exists := ecInfo.GetShardSize(shardId); exists { - ret.ShardSizes[retIndex] = size - } - retIndex++ - } - } - - return ret + return ids } -func (ecInfo *EcVolumeInfo) ToVolumeEcShardInformationMessage() (ret *master_pb.VolumeEcShardInformationMessage) { - t := &master_pb.VolumeEcShardInformationMessage{ - Id: uint32(ecInfo.VolumeId), - EcIndexBits: uint32(ecInfo.ShardBits), - Collection: ecInfo.Collection, - DiskType: ecInfo.DiskType, - ExpireAtSec: ecInfo.ExpireAtSec, - DiskId: ecInfo.DiskId, +// IdsInt returns a list of shards ID as int, in ascending order. +func (si *ShardsInfo) IdsInt() []int { + ids := si.Ids() + res := make([]int, len(ids)) + for i, id := range ids { + res[i] = int(id) } - // Directly set the optimized ShardSizes - t.ShardSizes = make([]int64, len(ecInfo.ShardSizes)) - copy(t.ShardSizes, ecInfo.ShardSizes) - - return t + return res } -type ShardBits uint32 // use bits to indicate the shard id, use 32 bits just for possible future extension +// Ids returns a list of shards ID as uint32, in ascending order. +func (si *ShardsInfo) IdsUint32() []uint32 { + return ShardIdsToUint32(si.Ids()) +} -func (b ShardBits) AddShardId(id ShardId) ShardBits { +// Set sets the size for a given shard ID. +func (si *ShardsInfo) Set(id ShardId, size ShardSize) { if id >= MaxShardCount { - return b // Reject out-of-range shard IDs + return + } + si.shards[id] = &ShardInfo{ + Id: id, + Size: size, } - return b | (1 << id) } -func (b ShardBits) RemoveShardId(id ShardId) ShardBits { +// Delete deletes a shard by ID. +func (si *ShardsInfo) Delete(id ShardId) { if id >= MaxShardCount { - return b // Reject out-of-range shard IDs + return + } + if _, ok := si.shards[id]; ok { + delete(si.shards, id) } - return b &^ (1 << id) } -func (b ShardBits) HasShardId(id ShardId) bool { - if id >= MaxShardCount { - return false // Out-of-range shard IDs are never present +// Bitmap returns a bitmap for all existing shard IDs (bit 0 = shard #0... bit 31 = shard #31), in little endian. +func (si *ShardsInfo) Bitmap() uint32 { + var bits uint32 + for id := range si.shards { + bits |= (1 << id) } - return b&(1< 0 + return bits } -func (b ShardBits) ShardIds() (ret []ShardId) { - for i := ShardId(0); i < ShardId(MaxShardCount); i++ { - if b.HasShardId(i) { - ret = append(ret, i) - } +// Size returns the size of a given shard ID, if present. +func (si *ShardsInfo) Size(id ShardId) ShardSize { + if s, ok := si.shards[id]; ok { + return s.Size } - return + return 0 } -func (b ShardBits) ToUint32Slice() (ret []uint32) { - for i := uint32(0); i < uint32(MaxShardCount); i++ { - if b.HasShardId(ShardId(i)) { - ret = append(ret, i) - } +// TotalSize returns the size for all shards. +func (si *ShardsInfo) TotalSize() ShardSize { + var total ShardSize + for _, s := range si.shards { + total += s.Size } - return + return total } -func (b ShardBits) ShardIdCount() (count int) { - for count = 0; b > 0; count++ { - b &= b - 1 +// Sizes returns a compact slice of present shard sizes, from first to last. +func (si *ShardsInfo) Sizes() []ShardSize { + ids := si.Ids() + + res := make([]ShardSize, len(ids)) + if len(res) != 0 { + var i int + for _, id := range ids { + res[i] = si.shards[id].Size + i++ + } } - return -} -func (b ShardBits) Minus(other ShardBits) ShardBits { - return b &^ other + return res } -func (b ShardBits) Plus(other ShardBits) ShardBits { - return b | other +// SizesInt64 returns a compact slice of present shard sizes, from first to last, as int64. +func (si *ShardsInfo) SizesInt64() []int64 { + res := make([]int64, si.Count()) + + for i, s := range si.Sizes() { + res[i] = int64(s) + } + return res } -func (b ShardBits) MinusParityShards() ShardBits { - // Removes parity shards from the bit mask - // Assumes default 10+4 EC layout where parity shards are IDs 10-13 - for i := DataShardsCount; i < TotalShardsCount; i++ { - b = b.RemoveShardId(ShardId(i)) +// Copy creates a copy of a ShardInfo. +func (si *ShardsInfo) Copy() *ShardsInfo { + new := NewShardsInfo() + for _, s := range si.shards { + new.Set(s.Id, s.Size) } - return b + return new } -// ShardIdToIndex converts a shard ID to its index position in the ShardSizes slice -// Returns the index and true if the shard is present, -1 and false if not present -func (b ShardBits) ShardIdToIndex(shardId ShardId) (index int, found bool) { - if !b.HasShardId(shardId) { - return -1, false +// DeleteParityShards removes party shards from a ShardInfo. +// Assumes default 10+4 EC layout where parity shards are IDs 10-13. +func (si *ShardsInfo) DeleteParityShards() { + for id := DataShardsCount; id < TotalShardsCount; id++ { + si.Delete(ShardId(id)) } +} - // Create a mask for bits before the shardId - mask := uint32((1 << shardId) - 1) - // Count set bits before the shardId using efficient bit manipulation - index = bits.OnesCount32(uint32(b) & mask) - return index, true +// MinusParityShards creates a ShardInfo copy, but with parity shards removed. +func (si *ShardsInfo) MinusParityShards() *ShardsInfo { + new := si.Copy() + new.DeleteParityShards() + return new } -// EachSetIndex iterates over all set shard IDs and calls the provided function for each -// This is highly efficient using bit manipulation - only iterates over actual set bits -func (b ShardBits) EachSetIndex(fn func(shardId ShardId)) { - bitsValue := uint32(b) - for bitsValue != 0 { - // Find the position of the least significant set bit - shardId := ShardId(bits.TrailingZeros32(bitsValue)) - fn(shardId) - // Clear the least significant set bit - bitsValue &= bitsValue - 1 +// Add merges all shards from another ShardInfo into this one. +func (si *ShardsInfo) Add(other *ShardsInfo) { + for _, s := range other.shards { + si.Set(s.Id, s.Size) } } -// IndexToShardId converts an index position in ShardSizes slice to the corresponding shard ID -// Returns the shard ID and true if valid index, -1 and false if invalid index -func (b ShardBits) IndexToShardId(index int) (shardId ShardId, found bool) { - if index < 0 { - return 0, false +// Subtract removes all shards present on another ShardInfo. +func (si *ShardsInfo) Subtract(other *ShardsInfo) { + for _, s := range other.shards { + si.Delete(s.Id) } +} - currentIndex := 0 - for i := ShardId(0); i < ShardId(MaxShardCount); i++ { - if b.HasShardId(i) { - if currentIndex == index { - return i, true - } - currentIndex++ - } - } - return 0, false // index out of range +// Plus returns a new ShardInfo consisting of (this + other). +func (si *ShardsInfo) Plus(other *ShardsInfo) *ShardsInfo { + new := si.Copy() + new.Add(other) + return new } -// Helper methods for EcVolumeInfo to manage the optimized ShardSizes slice -func (ecInfo *EcVolumeInfo) ensureShardSizesInitialized() { - expectedLength := ecInfo.ShardBits.ShardIdCount() - if ecInfo.ShardSizes == nil { - ecInfo.ShardSizes = make([]int64, expectedLength) - } else if len(ecInfo.ShardSizes) != expectedLength { - // Resize and preserve existing data - ecInfo.resizeShardSizes(ecInfo.ShardBits) - } +// Minus returns a new ShardInfo consisting of (this - other). +func (si *ShardsInfo) Minus(other *ShardsInfo) *ShardsInfo { + new := si.Copy() + new.Subtract(other) + return new } -func (ecInfo *EcVolumeInfo) resizeShardSizes(prevShardBits ShardBits) { - expectedLength := ecInfo.ShardBits.ShardIdCount() - newSizes := make([]int64, expectedLength) - - // Copy existing sizes to new positions based on current ShardBits - if len(ecInfo.ShardSizes) > 0 { - newIndex := 0 - for shardId := ShardId(0); shardId < ShardId(MaxShardCount) && newIndex < expectedLength; shardId++ { - if ecInfo.ShardBits.HasShardId(shardId) { - // Try to find the size for this shard in the old array using previous ShardBits - if oldIndex, found := prevShardBits.ShardIdToIndex(shardId); found && oldIndex < len(ecInfo.ShardSizes) { - newSizes[newIndex] = ecInfo.ShardSizes[oldIndex] - } - newIndex++ - } - } +// data structure used in master +type EcVolumeInfo struct { + VolumeId needle.VolumeId + Collection string + DiskType string + DiskId uint32 // ID of the disk this EC volume is on + ExpireAtSec uint64 // ec volume destroy time, calculated from the ec volume was created + ShardsInfo *ShardsInfo +} + +func (ecInfo *EcVolumeInfo) Minus(other *EcVolumeInfo) *EcVolumeInfo { + return &EcVolumeInfo{ + VolumeId: ecInfo.VolumeId, + Collection: ecInfo.Collection, + ShardsInfo: ecInfo.ShardsInfo.Minus(other.ShardsInfo), + DiskType: ecInfo.DiskType, + DiskId: ecInfo.DiskId, + ExpireAtSec: ecInfo.ExpireAtSec, } +} - ecInfo.ShardSizes = newSizes +func (evi *EcVolumeInfo) ToVolumeEcShardInformationMessage() (ret *master_pb.VolumeEcShardInformationMessage) { + return &master_pb.VolumeEcShardInformationMessage{ + Id: uint32(evi.VolumeId), + EcIndexBits: evi.ShardsInfo.Bitmap(), + ShardSizes: evi.ShardsInfo.SizesInt64(), + Collection: evi.Collection, + DiskType: evi.DiskType, + ExpireAtSec: evi.ExpireAtSec, + DiskId: evi.DiskId, + } } diff --git a/weed/storage/erasure_coding/ec_volume_info_test.go b/weed/storage/erasure_coding/ec_volume_info_test.go new file mode 100644 index 000000000..b942ebdcd --- /dev/null +++ b/weed/storage/erasure_coding/ec_volume_info_test.go @@ -0,0 +1,173 @@ +package erasure_coding_test + +import ( + "reflect" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + erasure_coding "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" +) + +func TestShardsInfoDeleteParityShards(t *testing.T) { + si := erasure_coding.NewShardsInfo() + for _, id := range erasure_coding.AllShardIds() { + si.Set(id, 123) + } + si.DeleteParityShards() + + if got, want := si.String(), "0:123 B 1:123 B 2:123 B 3:123 B 4:123 B 5:123 B 6:123 B 7:123 B 8:123 B 9:123 B"; got != want { + t.Errorf("expected %q, got %q", want, got) + } + +} + +func TestShardsInfoAsSlice(t *testing.T) { + si := erasure_coding.NewShardsInfo() + si.Set(5, 555) + si.Set(2, 222) + si.Set(7, 777) + si.Set(1, 111) + + want := []*erasure_coding.ShardInfo{ + &erasure_coding.ShardInfo{Id: 1, Size: 111}, + &erasure_coding.ShardInfo{Id: 2, Size: 222}, + &erasure_coding.ShardInfo{Id: 5, Size: 555}, + &erasure_coding.ShardInfo{Id: 7, Size: 777}, + } + if got := si.AsSlice(); !reflect.DeepEqual(got, want) { + t.Errorf("expected %v, got %v", want, got) + } +} + +func TestShardsInfoSerialize(t *testing.T) { + testCases := []struct { + name string + shardIds map[erasure_coding.ShardId]erasure_coding.ShardSize + wantBits uint32 + wantSizes []erasure_coding.ShardSize + }{ + { + name: "no bits", + shardIds: nil, + wantBits: 0b0, + wantSizes: []erasure_coding.ShardSize{}, + }, + { + name: "single shard, first", + shardIds: map[erasure_coding.ShardId]erasure_coding.ShardSize{ + 0: 2345, + }, + wantBits: 0b1, + wantSizes: []erasure_coding.ShardSize{2345}, + }, + { + name: "single shard, 5th", + shardIds: map[erasure_coding.ShardId]erasure_coding.ShardSize{ + 4: 6789, + }, + wantBits: 0b10000, + wantSizes: []erasure_coding.ShardSize{6789}, + }, + { + name: "multiple shards", + shardIds: map[erasure_coding.ShardId]erasure_coding.ShardSize{ + 8: 800, + 0: 5, + 3: 300, + 1: 100, + }, + wantBits: 0b100001011, + wantSizes: []erasure_coding.ShardSize{5, 100, 300, 800}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + si := erasure_coding.NewShardsInfo() + for id, size := range tc.shardIds { + si.Set(id, size) + } + + if got, want := si.Bitmap(), tc.wantBits; got != want { + t.Errorf("expected bits %v, got %v", want, got) + } + if got, want := si.Sizes(), tc.wantSizes; !reflect.DeepEqual(got, want) { + t.Errorf("expected sizes %v, got %v", want, got) + } + }) + } +} + +func TestShardsInfoFromVolumeEcShardInformationMessage(t *testing.T) { + testCases := []struct { + name string + ecvInfo *master_pb.VolumeEcShardInformationMessage + want string + }{ + { + name: "no msg", + ecvInfo: nil, + want: "", + }, + { + name: "no shards", + ecvInfo: &master_pb.VolumeEcShardInformationMessage{}, + want: "", + }, + { + name: "single shard", + ecvInfo: &master_pb.VolumeEcShardInformationMessage{ + EcIndexBits: 0b100, + ShardSizes: []int64{333}, + }, + want: "2:333 B", + }, + { + name: "multiple shards", + ecvInfo: &master_pb.VolumeEcShardInformationMessage{ + EcIndexBits: 0b1101, + ShardSizes: []int64{111, 333, 444}, + }, + want: "0:111 B 2:333 B 3:444 B", + }, + { + name: "multiple shards with missing sizes", + ecvInfo: &master_pb.VolumeEcShardInformationMessage{ + EcIndexBits: 0b110110, + ShardSizes: []int64{111, 333, 444}, + }, + want: "1:111 B 2:333 B 4:444 B 5:0 B", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(tc.ecvInfo) + if got, want := si.String(), tc.want; got != want { + t.Errorf("expected %q, got %q", want, got) + } + }) + } +} + +func TestShardsInfoCombine(t *testing.T) { + a := erasure_coding.NewShardsInfo() + a.Set(1, 111) + a.Set(2, 222) + a.Set(3, 333) + a.Set(4, 444) + a.Set(5, 0) + + b := erasure_coding.NewShardsInfo() + b.Set(1, 555) + b.Set(4, 666) + b.Set(5, 777) + b.Set(6, 888) + + if got, want := a.Plus(b).String(), "1:555 B 2:222 B 3:333 B 4:666 B 5:777 B 6:888 B"; got != want { + t.Errorf("expected %q for plus, got %q", want, got) + } + if got, want := a.Minus(b).String(), "2:222 B 3:333 B"; got != want { + t.Errorf("expected %q for minus, got %q", want, got) + } +} diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 6a26b4ae0..0d30128a6 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -53,12 +53,13 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er if ecVolume, err := location.LoadEcShard(collection, vid, shardId); err == nil { glog.V(0).Infof("MountEcShards %d.%d on disk ID %d", vid, shardId, diskId) - var shardBits erasure_coding.ShardBits - + si := erasure_coding.NewShardsInfo() + si.Set(shardId, erasure_coding.ShardSize(ecVolume.ShardSize())) s.NewEcShardsChan <- master_pb.VolumeEcShardInformationMessage{ Id: uint32(vid), Collection: collection, - EcIndexBits: uint32(shardBits.AddShardId(shardId)), + EcIndexBits: uint32(si.Bitmap()), + ShardSizes: si.SizesInt64(), DiskType: string(location.DiskType), ExpireAtSec: ecVolume.ExpireAtSec, DiskId: uint32(diskId), @@ -75,17 +76,18 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er } func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.ShardId) error { - diskId, ecShard, found := s.findEcShard(vid, shardId) if !found { return nil } - var shardBits erasure_coding.ShardBits + si := erasure_coding.NewShardsInfo() + si.Set(shardId, 0) message := master_pb.VolumeEcShardInformationMessage{ Id: uint32(vid), Collection: ecShard.Collection, - EcIndexBits: uint32(shardBits.AddShardId(shardId)), + EcIndexBits: si.Bitmap(), + ShardSizes: si.SizesInt64(), DiskType: string(ecShard.DiskType), DiskId: diskId, } diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go index 839499982..ee428fd7c 100644 --- a/weed/topology/data_node_ec.go +++ b/weed/topology/data_node_ec.go @@ -35,18 +35,18 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) if actualEcShards, ok := actualEcShardMap[vid]; !ok { // dn registered ec shards not found in the new set of ec shards deletedShards = append(deletedShards, ecShards) - deletedShardCount += ecShards.ShardIdCount() + deletedShardCount += ecShards.ShardsInfo.Count() } else { // found, but maybe the actual shard could be missing a := actualEcShards.Minus(ecShards) - if a.ShardIdCount() > 0 { + if a.ShardsInfo.Count() > 0 { newShards = append(newShards, a) - newShardCount += a.ShardIdCount() + newShardCount += a.ShardsInfo.Count() } d := ecShards.Minus(actualEcShards) - if d.ShardIdCount() > 0 { + if d.ShardsInfo.Count() > 0 { deletedShards = append(deletedShards, d) - deletedShardCount += d.ShardIdCount() + deletedShardCount += d.ShardsInfo.Count() } } @@ -67,7 +67,7 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) disk := dn.getOrCreateDisk(ecShards.DiskType) disk.UpAdjustDiskUsageDelta(types.ToDiskType(ecShards.DiskType), &DiskUsageCounts{ - ecShardCount: int64(ecShards.ShardIdCount()), + ecShardCount: int64(ecShards.ShardsInfo.Count()), }) } @@ -106,7 +106,6 @@ func (dn *DataNode) doUpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo } func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_coding.EcVolumeInfo) { - for _, newShard := range newShards { dn.AddOrUpdateEcShard(newShard) } diff --git a/weed/topology/disk_ec.go b/weed/topology/disk_ec.go index 1fea29272..a783b309e 100644 --- a/weed/topology/disk_ec.go +++ b/weed/topology/disk_ec.go @@ -22,20 +22,18 @@ func (d *Disk) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) { delta := 0 if existing, ok := d.ecShards[s.VolumeId]; !ok { d.ecShards[s.VolumeId] = s - delta = s.ShardBits.ShardIdCount() + delta = s.ShardsInfo.Count() } else { - oldCount := existing.ShardBits.ShardIdCount() - existing.ShardBits = existing.ShardBits.Plus(s.ShardBits) - delta = existing.ShardBits.ShardIdCount() - oldCount + oldCount := existing.ShardsInfo.Count() + existing.ShardsInfo.Add(s.ShardsInfo) + delta = existing.ShardsInfo.Count() - oldCount } - if delta == 0 { - return + if delta != 0 { + d.UpAdjustDiskUsageDelta(types.ToDiskType(string(d.Id())), &DiskUsageCounts{ + ecShardCount: int64(delta), + }) } - d.UpAdjustDiskUsageDelta(types.ToDiskType(string(d.Id())), &DiskUsageCounts{ - ecShardCount: int64(delta), - }) - } func (d *Disk) DeleteEcShard(s *erasure_coding.EcVolumeInfo) { @@ -43,17 +41,16 @@ func (d *Disk) DeleteEcShard(s *erasure_coding.EcVolumeInfo) { defer d.ecShardsLock.Unlock() if existing, ok := d.ecShards[s.VolumeId]; ok { - oldCount := existing.ShardBits.ShardIdCount() - existing.ShardBits = existing.ShardBits.Minus(s.ShardBits) - delta := existing.ShardBits.ShardIdCount() - oldCount + oldCount := existing.ShardsInfo.Count() + existing.ShardsInfo.Subtract(s.ShardsInfo) + delta := existing.ShardsInfo.Count() - oldCount if delta != 0 { d.UpAdjustDiskUsageDelta(types.ToDiskType(string(d.Id())), &DiskUsageCounts{ ecShardCount: int64(delta), }) } - - if existing.ShardBits.ShardIdCount() == 0 { + if existing.ShardsInfo.Count() == 0 { delete(d.ecShards, s.VolumeId) } } @@ -61,7 +58,6 @@ func (d *Disk) DeleteEcShard(s *erasure_coding.EcVolumeInfo) { } func (d *Disk) HasVolumesById(id needle.VolumeId) (hasVolumeId bool) { - // check whether normal volumes has this volume id d.RLock() _, ok := d.volumes[id] @@ -83,5 +79,4 @@ func (d *Disk) HasVolumesById(id needle.VolumeId) (hasVolumeId bool) { d.ecShardsLock.RUnlock() return - } diff --git a/weed/topology/topology_ec.go b/weed/topology/topology_ec.go index c8b511338..827e2e801 100644 --- a/weed/topology/topology_ec.go +++ b/weed/topology/topology_ec.go @@ -22,11 +22,10 @@ func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInf ecVolumeInfo := &erasure_coding.EcVolumeInfo{ VolumeId: needle.VolumeId(shardInfo.Id), Collection: shardInfo.Collection, - ShardBits: erasure_coding.ShardBits(shardInfo.EcIndexBits), + ShardsInfo: erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(shardInfo), DiskType: shardInfo.DiskType, DiskId: shardInfo.DiskId, ExpireAtSec: shardInfo.ExpireAtSec, - ShardSizes: shardInfo.ShardSizes, } shards = append(shards, ecVolumeInfo) @@ -50,11 +49,10 @@ func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards ecVolumeInfo := &erasure_coding.EcVolumeInfo{ VolumeId: needle.VolumeId(shardInfo.Id), Collection: shardInfo.Collection, - ShardBits: erasure_coding.ShardBits(shardInfo.EcIndexBits), + ShardsInfo: erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(shardInfo), DiskType: shardInfo.DiskType, DiskId: shardInfo.DiskId, ExpireAtSec: shardInfo.ExpireAtSec, - ShardSizes: shardInfo.ShardSizes, } newShards = append(newShards, ecVolumeInfo) @@ -64,11 +62,10 @@ func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards ecVolumeInfo := &erasure_coding.EcVolumeInfo{ VolumeId: needle.VolumeId(shardInfo.Id), Collection: shardInfo.Collection, - ShardBits: erasure_coding.ShardBits(shardInfo.EcIndexBits), + ShardsInfo: erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(shardInfo), DiskType: shardInfo.DiskType, DiskId: shardInfo.DiskId, ExpireAtSec: shardInfo.ExpireAtSec, - ShardSizes: shardInfo.ShardSizes, } deletedShards = append(deletedShards, ecVolumeInfo) @@ -124,31 +121,31 @@ func (loc *EcShardLocations) DeleteShard(shardId erasure_coding.ShardId, dn *Dat return true } -func (t *Topology) RegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) { +func (t *Topology) RegisterEcShards(ecvi *erasure_coding.EcVolumeInfo, dn *DataNode) { t.ecShardMapLock.Lock() defer t.ecShardMapLock.Unlock() - locations, found := t.ecShardMap[ecShardInfos.VolumeId] + locations, found := t.ecShardMap[ecvi.VolumeId] if !found { - locations = NewEcShardLocations(ecShardInfos.Collection) - t.ecShardMap[ecShardInfos.VolumeId] = locations + locations = NewEcShardLocations(ecvi.Collection) + t.ecShardMap[ecvi.VolumeId] = locations } - for _, shardId := range ecShardInfos.ShardIds() { + for _, shardId := range ecvi.ShardsInfo.Ids() { locations.AddShard(shardId, dn) } } -func (t *Topology) UnRegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) { - glog.Infof("removing ec shard info:%+v", ecShardInfos) +func (t *Topology) UnRegisterEcShards(ecvi *erasure_coding.EcVolumeInfo, dn *DataNode) { + glog.Infof("removing ec shard info:%+v", ecvi) t.ecShardMapLock.Lock() defer t.ecShardMapLock.Unlock() - locations, found := t.ecShardMap[ecShardInfos.VolumeId] + locations, found := t.ecShardMap[ecvi.VolumeId] if !found { return } - for _, shardId := range ecShardInfos.ShardIds() { + for _, shardId := range ecvi.ShardsInfo.Ids() { locations.DeleteShard(shardId, dn) } } diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go index e5a8969fc..fa34e4db2 100644 --- a/weed/topology/topology_test.go +++ b/weed/topology/topology_test.go @@ -351,10 +351,12 @@ func TestListCollections(t *testing.T) { topo.RegisterEcShards(&erasure_coding.EcVolumeInfo{ VolumeId: needle.VolumeId(4444), Collection: "ec_collection_a", + ShardsInfo: erasure_coding.NewShardsInfo(), }, dn) topo.RegisterEcShards(&erasure_coding.EcVolumeInfo{ VolumeId: needle.VolumeId(5555), Collection: "ec_collection_b", + ShardsInfo: erasure_coding.NewShardsInfo(), }, dn) testCases := []struct { diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index 1beb910a3..3de358c4d 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -503,7 +503,7 @@ func diskInfosToCandidates(disks []*topology.DiskInfo) []*placement.DiskCandidat if disk.DiskInfo.EcShardInfos != nil { for _, shardInfo := range disk.DiskInfo.EcShardInfos { if shardInfo.DiskId == disk.DiskID { - ecShardCount += erasure_coding.ShardBits(shardInfo.EcIndexBits).ShardIdCount() + ecShardCount += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(shardInfo) } } }