diff --git a/.gitignore b/.gitignore index 7bca32242..91fa9391d 100644 --- a/.gitignore +++ b/.gitignore @@ -130,3 +130,4 @@ coverage.out /test/s3/remote_cache/test-remote-data test/s3/remote_cache/remote-server.pid test/s3/remote_cache/primary-server.pid +/test/erasure_coding/filerldb2 diff --git a/test/erasure_coding/Makefile b/test/erasure_coding/Makefile new file mode 100644 index 000000000..59db5cd07 --- /dev/null +++ b/test/erasure_coding/Makefile @@ -0,0 +1,187 @@ +# Makefile for EC integration testing +# Usage: +# make start - Start the test cluster (master + 6 volume servers + filer) +# make stop - Stop the test cluster +# make populate - Populate test data (~300MB across 7 volumes) +# make shell - Open weed shell connected to the test cluster +# make clean - Stop cluster and remove all test data +# make setup - Start cluster and populate data (one command) +# +# Requirements: curl, jq + +WEED_BINARY := $(shell pwd)/../../weed/weed +TEST_DIR := /tmp/ec_manual_test +# Use non-standard ports to avoid conflicts with existing SeaweedFS servers +MASTER_PORT := 29333 +FILER_PORT := 28888 +VOLUME_BASE_PORT := 28080 +NUM_VOLUME_SERVERS := 6 +VOLUME_SIZE_LIMIT_MB := 30 +MAX_VOLUMES_PER_SERVER := 10 + +# Build weed binary if it doesn't exist +$(WEED_BINARY): + cd ../../weed && go build -o weed . + +.PHONY: build +build: $(WEED_BINARY) + +.PHONY: start +start: build + @echo "=== Starting SeaweedFS test cluster ===" + @mkdir -p $(TEST_DIR)/master $(TEST_DIR)/filer + @for i in $$(seq 0 $$(($(NUM_VOLUME_SERVERS)-1))); do mkdir -p $(TEST_DIR)/volume$$i; done + @# Create security.toml with JWT disabled + @echo "# Disable JWT for testing" > $(TEST_DIR)/security.toml + @echo '[jwt.signing]' >> $(TEST_DIR)/security.toml + @echo 'key = ""' >> $(TEST_DIR)/security.toml + @echo 'expires_after_seconds = 0' >> $(TEST_DIR)/security.toml + @echo '' >> $(TEST_DIR)/security.toml + @echo '[jwt.signing.read]' >> $(TEST_DIR)/security.toml + @echo 'key = ""' >> $(TEST_DIR)/security.toml + @echo 'expires_after_seconds = 0' >> $(TEST_DIR)/security.toml + @# Create filer.toml with leveldb2 + @echo '[leveldb2]' > $(TEST_DIR)/filer.toml + @echo 'enabled = true' >> $(TEST_DIR)/filer.toml + @echo 'dir = "$(TEST_DIR)/filer/filerldb2"' >> $(TEST_DIR)/filer.toml + @# Start master + @echo "Starting master on port $(MASTER_PORT)..." + @cd $(TEST_DIR) && $(WEED_BINARY) master \ + -port=$(MASTER_PORT) \ + -mdir=$(TEST_DIR)/master \ + -volumeSizeLimitMB=$(VOLUME_SIZE_LIMIT_MB) \ + -ip=127.0.0.1 \ + > $(TEST_DIR)/master/master.log 2>&1 & echo $$! > $(TEST_DIR)/master.pid + @sleep 3 + @# Start volume servers (run from TEST_DIR to find security.toml) + @for i in $$(seq 0 $$(($(NUM_VOLUME_SERVERS)-1))); do \ + port=$$(($(VOLUME_BASE_PORT) + $$i)); \ + echo "Starting volume server $$i on port $$port (rack$$i)..."; \ + cd $(TEST_DIR) && $(WEED_BINARY) volume \ + -port=$$port \ + -dir=$(TEST_DIR)/volume$$i \ + -max=$(MAX_VOLUMES_PER_SERVER) \ + -master=127.0.0.1:$(MASTER_PORT) \ + -ip=127.0.0.1 \ + -dataCenter=dc1 \ + -rack=rack$$i \ + > $(TEST_DIR)/volume$$i/volume.log 2>&1 & echo $$! > $(TEST_DIR)/volume$$i.pid; \ + done + @sleep 3 + @# Start filer (run from TEST_DIR to find security.toml) + @echo "Starting filer on port $(FILER_PORT)..." + @cd $(TEST_DIR) && $(WEED_BINARY) filer \ + -port=$(FILER_PORT) \ + -master=127.0.0.1:$(MASTER_PORT) \ + -ip=127.0.0.1 \ + > $(TEST_DIR)/filer/filer.log 2>&1 & echo $$! > $(TEST_DIR)/filer.pid + @sleep 3 + @echo "" + @echo "=== Cluster started ===" + @echo "Master: http://127.0.0.1:$(MASTER_PORT)" + @echo "Filer: http://127.0.0.1:$(FILER_PORT)" + @echo "Volume servers: http://127.0.0.1:$(VOLUME_BASE_PORT) - http://127.0.0.1:$$(($(VOLUME_BASE_PORT) + $(NUM_VOLUME_SERVERS) - 1))" + @echo "" + @echo "Run 'make shell' to open weed shell" + @echo "Run 'make populate' to add test data" + +.PHONY: stop +stop: + @echo "=== Stopping SeaweedFS test cluster ===" + @# Stop filer by PID + @-[ -f $(TEST_DIR)/filer.pid ] && kill $$(cat $(TEST_DIR)/filer.pid) 2>/dev/null && rm -f $(TEST_DIR)/filer.pid || true + @# Stop volume servers by PID + @for i in $$(seq 0 $$(($(NUM_VOLUME_SERVERS)-1))); do \ + [ -f $(TEST_DIR)/volume$$i.pid ] && kill $$(cat $(TEST_DIR)/volume$$i.pid) 2>/dev/null && rm -f $(TEST_DIR)/volume$$i.pid || true; \ + done + @# Stop master by PID + @-[ -f $(TEST_DIR)/master.pid ] && kill $$(cat $(TEST_DIR)/master.pid) 2>/dev/null && rm -f $(TEST_DIR)/master.pid || true + @# Fallback: use pkill with specific patterns to ensure cleanup + @-pkill -f "weed filer.*-master=127.0.0.1:$(MASTER_PORT)" 2>/dev/null || true + @-pkill -f "weed volume.*-dir=$(TEST_DIR)/volume" 2>/dev/null || true + @-pkill -f "weed master.*-mdir=$(TEST_DIR)/master" 2>/dev/null || true + @echo "Cluster stopped." + +.PHONY: clean +clean: stop + @echo "Removing test data..." + @rm -rf $(TEST_DIR) + @echo "Clean complete." + +.PHONY: populate +populate: + @echo "=== Populating test data (~300MB) ===" + @# Create a 500KB test file template using mktemp for isolation + @tmpfile=$$(mktemp) && \ + dd if=/dev/urandom bs=1024 count=500 of=$$tmpfile 2>/dev/null && \ + uploaded=0; \ + for i in $$(seq 1 600); do \ + response=$$(curl -s "http://127.0.0.1:$(MASTER_PORT)/dir/assign?collection=ectest&replication=000"); \ + fid=$$(echo $$response | jq -r '.fid'); \ + url=$$(echo $$response | jq -r '.url'); \ + if [ "$$fid" != "null" ] && [ -n "$$fid" ]; then \ + curl -s -F "file=@$$tmpfile;filename=file_$$i.bin" "http://$$url/$$fid" > /dev/null; \ + uploaded=$$((uploaded + 1)); \ + fi; \ + if [ $$((i % 100)) -eq 0 ]; then \ + echo "Uploaded $$uploaded files..."; \ + fi; \ + done; \ + rm -f $$tmpfile; \ + echo ""; \ + echo "=== Data population complete ==="; \ + echo "Uploaded $$uploaded files (~$$((uploaded * 500 / 1024))MB)" + @echo "" + @echo "Volume status:" + @curl -s "http://127.0.0.1:$(MASTER_PORT)/vol/status" | jq -r \ + '.Volumes.DataCenters.dc1 | to_entries[] | .key as $$rack | .value | to_entries[] | select(.value != null) | .key as $$server | .value[] | select(.Collection == "ectest") | " Volume \(.Id): \(.FileCount) files, \((.Size/1048576*10|floor)/10)MB - \($$rack)"' 2>/dev/null || true + +.PHONY: shell +shell: build + @echo "Opening weed shell..." + @echo "Commands to try:" + @echo " lock" + @echo " volume.list" + @echo " ec.encode -collection=ectest -quietFor=1s -force" + @echo " ec.balance -collection=ectest" + @echo " unlock" + @echo "" + @$(WEED_BINARY) shell -master=127.0.0.1:$(MASTER_PORT) -filer=127.0.0.1:$(FILER_PORT) + +.PHONY: setup +setup: clean start + @sleep 2 + @$(MAKE) populate + +.PHONY: status +status: + @echo "=== Cluster Status ===" + @curl -s "http://127.0.0.1:$(MASTER_PORT)/vol/status" | jq -r \ + '.Volumes.DataCenters.dc1 | to_entries[] | .key as $$rack | .value | to_entries[] | select(.value != null) | .key as $$server | .value[] | select(.Collection == "ectest") | "Volume \(.Id): \(.FileCount) files, \((.Size/1048576*10|floor)/10)MB - \($$rack) (\($$server))"' 2>/dev/null | sort -t: -k1 -n || echo "Cluster not running" + @echo "" + @echo "=== EC Shards ===" + @for i in $$(seq 0 $$(($(NUM_VOLUME_SERVERS)-1))); do \ + count=$$(ls $(TEST_DIR)/volume$$i/*.ec[0-9]* 2>/dev/null | wc -l | tr -d ' '); \ + if [ "$$count" != "0" ]; then \ + echo " volume$$i (port $$(($(VOLUME_BASE_PORT) + $$i))): $$count EC shard files"; \ + fi; \ + done + +.PHONY: help +help: + @echo "EC Integration Test Makefile" + @echo "" + @echo "Targets:" + @echo " make start - Start test cluster (master + 6 volume servers + filer)" + @echo " make stop - Stop test cluster" + @echo " make populate - Populate ~300MB of test data" + @echo " make shell - Open weed shell" + @echo " make setup - Clean, start, and populate (all-in-one)" + @echo " make status - Show cluster and EC shard status" + @echo " make clean - Stop cluster and remove all test data" + @echo " make help - Show this help" + @echo "" + @echo "Quick start:" + @echo " make setup # Start cluster and populate data" + @echo " make shell # Open shell to run EC commands" + diff --git a/test/erasure_coding/README.md b/test/erasure_coding/README.md index c04844982..2fc253d08 100644 --- a/test/erasure_coding/README.md +++ b/test/erasure_coding/README.md @@ -78,6 +78,43 @@ go test -v -run TestECEncodingMasterTimingRaceCondition go test -v -short ``` +## Manual Testing with Makefile + +A Makefile is provided for manual EC testing. + +**Requirements:** `curl`, `jq` (command-line JSON processor) + +```bash +# Quick start: start cluster and populate data +make setup + +# Open weed shell to run EC commands +make shell + +# Individual targets +make start # Start test cluster (master + 6 volume servers + filer) +make stop # Stop test cluster +make populate # Populate ~300MB of test data +make status # Show cluster and EC shard status +make clean # Stop cluster and remove all test data +make help # Show all targets +``` + +### EC Rebalance Limited Slots (Unit Test) + +The "no free ec shard slots" issue is tested with a **unit test** that works directly on +topology data structures without requiring a running cluster. + +**Location**: `weed/shell/ec_rebalance_slots_test.go` + +Tests included: +- `TestECRebalanceWithLimitedSlots`: Tests a topology with 6 servers, 7 EC volumes (98 shards) +- `TestECRebalanceZeroFreeSlots`: Reproduces the exact 0 free slots scenario + +**Known Issue**: When volume servers are at capacity (`volumeCount == maxVolumeCount`), +the rebalance step fails with "no free ec shard slots" instead of recognizing that +moving shards frees slots on source servers. + ## Test Results **With the fix**: Shows "Collecting volume locations for N volumes before EC encoding..." message diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 388303ebe..6f9543ca4 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -679,6 +679,25 @@ type ecBalancer struct { applyBalancing bool maxParallelization int diskType types.DiskType // target disk type for EC shards (default: HardDriveType) + // EC configuration for shard distribution (defaults to 10+4) + dataShardCount int + parityShardCount int +} + +// getDataShardCount returns the configured data shard count, defaulting to standard 10 +func (ecb *ecBalancer) getDataShardCount() int { + if ecb.dataShardCount > 0 { + return ecb.dataShardCount + } + return erasure_coding.DataShardsCount +} + +// getParityShardCount returns the configured parity shard count, defaulting to standard 4 +func (ecb *ecBalancer) getParityShardCount() int { + if ecb.parityShardCount > 0 { + return ecb.parityShardCount + } + return erasure_coding.ParityShardsCount } func (ecb *ecBalancer) errorWaitGroup() *ErrorWaitGroup { @@ -785,59 +804,176 @@ func countShardsByRack(vid needle.VolumeId, locations []*EcNode, diskType types. }) } +// shardsByTypePerRack counts data shards (< dataShards) and parity shards (>= dataShards) per rack +func shardsByTypePerRack(vid needle.VolumeId, locations []*EcNode, diskType types.DiskType, dataShards int) (dataPerRack, parityPerRack map[string][]erasure_coding.ShardId) { + dataPerRack = make(map[string][]erasure_coding.ShardId) + parityPerRack = make(map[string][]erasure_coding.ShardId) + for _, ecNode := range locations { + shardBits := findEcVolumeShards(ecNode, vid, diskType) + rackId := string(ecNode.rack) + for _, shardId := range shardBits.ShardIds() { + if int(shardId) < dataShards { + dataPerRack[rackId] = append(dataPerRack[rackId], shardId) + } else { + parityPerRack[rackId] = append(parityPerRack[rackId], shardId) + } + } + } + return +} + func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needle.VolumeId, locations []*EcNode) error { racks := ecb.racks() + numRacks := len(racks) + + // Use configured EC scheme for shard type classification (defaults to 10+4) + dataShardCount := ecb.getDataShardCount() + parityShardCount := ecb.getParityShardCount() + + // Get current distribution of data shards per rack (parity computed after data balancing) + dataPerRack, _ := shardsByTypePerRack(vid, locations, ecb.diskType, dataShardCount) + + // Calculate max shards per rack for each type to ensure even spread + // Data: 10 shards / 6 racks = max 2 per rack + // Parity: 4 shards / 6 racks = max 1 per rack (with 2 racks having 0) + maxDataPerRack := ceilDivide(dataShardCount, numRacks) + maxParityPerRack := ceilDivide(parityShardCount, numRacks) - // see the volume's shards are in how many racks, and how many in each rack + rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string { + return string(ecNode.rack) + }) + + // Track total shard count per rack for slot management rackToShardCount := countShardsByRack(vid, locations, ecb.diskType) - // Calculate actual total shards for this volume (not hardcoded default) - var totalShardsForVolume int - for _, count := range rackToShardCount { - totalShardsForVolume += count + // First pass: Balance data shards across racks + if err := ecb.balanceShardTypeAcrossRacks(collection, vid, racks, rackEcNodesWithVid, dataPerRack, rackToShardCount, maxDataPerRack, "data"); err != nil { + return err } - // calculate average number of shards an ec rack should have for one volume - averageShardsPerEcRack := ceilDivide(totalShardsForVolume, len(racks)) - rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string { + + // Refresh locations after data shard moves and get parity distribution + locations = ecb.collectVolumeIdToEcNodes(collection)[vid] + _, parityPerRack := shardsByTypePerRack(vid, locations, ecb.diskType, dataShardCount) + rackEcNodesWithVid = groupBy(locations, func(ecNode *EcNode) string { return string(ecNode.rack) }) + rackToShardCount = countShardsByRack(vid, locations, ecb.diskType) + + // Second pass: Balance parity shards across racks + if err := ecb.balanceShardTypeAcrossRacks(collection, vid, racks, rackEcNodesWithVid, parityPerRack, rackToShardCount, maxParityPerRack, "parity"); err != nil { + return err + } + + return nil +} - // ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack - ecShardsToMove := make(map[erasure_coding.ShardId]*EcNode) - for rackId, count := range rackToShardCount { - if count <= averageShardsPerEcRack { +// balanceShardTypeAcrossRacks spreads shards of a specific type (data or parity) evenly across racks +func (ecb *ecBalancer) balanceShardTypeAcrossRacks( + collection string, + vid needle.VolumeId, + racks map[RackId]*EcRack, + rackEcNodesWithVid map[string][]*EcNode, + shardsPerRack map[string][]erasure_coding.ShardId, + rackToShardCount map[string]int, + maxPerRack int, + shardType string, +) error { + // Find racks with too many shards of this type + shardsToMove := make(map[erasure_coding.ShardId]*EcNode) + for rackId, shards := range shardsPerRack { + if len(shards) <= maxPerRack { continue } - possibleEcNodes := rackEcNodesWithVid[rackId] - for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack, ecb.diskType) { - ecShardsToMove[shardId] = ecNode + // Pick excess shards to move + excess := len(shards) - maxPerRack + ecNodes := rackEcNodesWithVid[rackId] + for i := 0; i < excess && i < len(shards); i++ { + shardId := shards[i] + // Find which node has this shard + for _, ecNode := range ecNodes { + shardBits := findEcVolumeShards(ecNode, vid, ecb.diskType) + if shardBits.HasShardId(shardId) { + shardsToMove[shardId] = ecNode + break + } + } } } - for shardId, ecNode := range ecShardsToMove { - rackId, err := ecb.pickRackToBalanceShardsInto(racks, rackToShardCount) + // Move shards to racks that have fewer than maxPerRack of this type + for shardId, ecNode := range shardsToMove { + // Find destination rack with room for this shard type + destRackId, err := ecb.pickRackForShardType(racks, shardsPerRack, maxPerRack, rackToShardCount) if err != nil { - fmt.Printf("ec shard %d.%d at %s can not find a destination rack:\n%s\n", vid, shardId, ecNode.info.Id, err.Error()) + fmt.Printf("ec %s shard %d.%d at %s can not find a destination rack:\n%s\n", shardType, vid, shardId, ecNode.info.Id, err.Error()) continue } var possibleDestinationEcNodes []*EcNode - for _, n := range racks[rackId].ecNodes { + for _, n := range racks[destRackId].ecNodes { possibleDestinationEcNodes = append(possibleDestinationEcNodes, n) } err = ecb.pickOneEcNodeAndMoveOneShard(ecNode, collection, vid, shardId, possibleDestinationEcNodes) if err != nil { return err } - rackToShardCount[string(rackId)] += 1 - rackToShardCount[string(ecNode.rack)] -= 1 - racks[rackId].freeEcSlot -= 1 - racks[ecNode.rack].freeEcSlot += 1 + + // Update tracking + shardsPerRack[string(destRackId)] = append(shardsPerRack[string(destRackId)], shardId) + // Remove from source rack + srcRack := string(ecNode.rack) + for i, s := range shardsPerRack[srcRack] { + if s == shardId { + shardsPerRack[srcRack] = append(shardsPerRack[srcRack][:i], shardsPerRack[srcRack][i+1:]...) + break + } + } + rackToShardCount[string(destRackId)] += 1 + rackToShardCount[srcRack] -= 1 + racks[destRackId].freeEcSlot -= 1 + racks[RackId(srcRack)].freeEcSlot += 1 } return nil } +// pickRackForShardType selects a rack that has room for more shards of a specific type +func (ecb *ecBalancer) pickRackForShardType( + rackToEcNodes map[RackId]*EcRack, + shardsPerRack map[string][]erasure_coding.ShardId, + maxPerRack int, + rackToShardCount map[string]int, +) (RackId, error) { + var candidates []RackId + minShards := maxPerRack + 1 + + for rackId, rack := range rackToEcNodes { + if rack.freeEcSlot <= 0 { + continue + } + currentCount := len(shardsPerRack[string(rackId)]) + if currentCount >= maxPerRack { + continue + } + // For EC shards, replica placement constraint only applies when DiffRackCount > 0. + if ecb.replicaPlacement != nil && ecb.replicaPlacement.DiffRackCount > 0 && rackToShardCount[string(rackId)] >= ecb.replicaPlacement.DiffRackCount { + continue + } + if currentCount < minShards { + candidates = nil + minShards = currentCount + } + if currentCount == minShards { + candidates = append(candidates, rackId) + } + } + + if len(candidates) == 0 { + return "", errors.New("no rack available for shard type balancing") + } + return candidates[rand.IntN(len(candidates))], nil +} + func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int) (RackId, error) { targets := []RackId{} targetShards := -1 @@ -855,7 +991,11 @@ func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcR details += fmt.Sprintf(" Skipped %s because it has no free slots\n", rackId) continue } - if ecb.replicaPlacement != nil && shards > ecb.replicaPlacement.DiffRackCount { + // For EC shards, replica placement constraint only applies when DiffRackCount > 0. + // When DiffRackCount = 0 (e.g., replica placement "000"), EC shards should be + // distributed freely across racks for fault tolerance - the "000" means + // "no volume replication needed" because erasure coding provides redundancy. + if ecb.replicaPlacement != nil && ecb.replicaPlacement.DiffRackCount > 0 && shards > ecb.replicaPlacement.DiffRackCount { details += fmt.Sprintf(" Skipped %s because shards %d > replica placement limit for other racks (%d)\n", rackId, shards, ecb.replicaPlacement.DiffRackCount) continue } @@ -1056,7 +1196,11 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi } shards := nodeShards[node] - if ecb.replicaPlacement != nil && shards > ecb.replicaPlacement.SameRackCount+1 { + // For EC shards, replica placement constraint only applies when SameRackCount > 0. + // When SameRackCount = 0 (e.g., replica placement "000"), EC shards should be + // distributed freely within racks - the "000" means "no volume replication needed" + // because erasure coding provides redundancy. + if ecb.replicaPlacement != nil && ecb.replicaPlacement.SameRackCount > 0 && shards > ecb.replicaPlacement.SameRackCount+1 { details += fmt.Sprintf(" Skipped %s because shards %d > replica placement limit for the rack (%d + 1)\n", node.info.Id, shards, ecb.replicaPlacement.SameRackCount) continue } diff --git a/weed/shell/command_ec_common_test.go b/weed/shell/command_ec_common_test.go index 8f5f4a1b9..ff186f21d 100644 --- a/weed/shell/command_ec_common_test.go +++ b/weed/shell/command_ec_common_test.go @@ -133,7 +133,9 @@ func TestPickRackToBalanceShardsInto(t *testing.T) { {testTopologyEc, "6241", "123", []string{"rack1", "rack2", "rack3", "rack4", "rack5", "rack6"}, ""}, {testTopologyEc, "6242", "123", []string{"rack1", "rack2", "rack3", "rack4", "rack5", "rack6"}, ""}, // EC volumes. - {testTopologyEc, "9577", "", nil, "shards 1 > replica placement limit for other racks (0)"}, + // With replication "000" (DiffRackCount=0), EC shards should be distributed freely + // because erasure coding provides its own redundancy. No replica placement error. + {testTopologyEc, "9577", "", []string{"rack1", "rack2", "rack3"}, ""}, {testTopologyEc, "9577", "111", []string{"rack1", "rack2", "rack3"}, ""}, {testTopologyEc, "9577", "222", []string{"rack1", "rack2", "rack3"}, ""}, {testTopologyEc, "10457", "222", []string{"rack1"}, ""}, diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 33da76664..f3c9439da 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -94,7 +94,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr shardReplicaPlacement := encodeCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty") sourceDiskTypeStr := encodeCommand.String("sourceDiskType", "", "filter source volumes by disk type (hdd, ssd, or empty for all)") diskTypeStr := encodeCommand.String("diskType", "", "target disk type for EC shards (hdd, ssd, or empty for default hdd)") - applyBalancing := encodeCommand.Bool("rebalance", false, "re-balance EC shards after creation") + applyBalancing := encodeCommand.Bool("rebalance", true, "re-balance EC shards after creation (default: true)") verbose := encodeCommand.Bool("verbose", false, "show detailed reasons why volumes are not selected for encoding") if err = encodeCommand.Parse(args); err != nil { @@ -164,6 +164,32 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return fmt.Errorf("failed to collect volume locations before EC encoding: %w", err) } + // Pre-flight check: verify the target disk type has capacity for EC shards + // This prevents encoding shards only to fail during rebalance + _, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, "", diskType) + if err != nil { + return fmt.Errorf("failed to check EC shard capacity: %w", err) + } + + // Calculate required slots: each volume needs TotalShardsCount (14) shards distributed + requiredSlots := len(volumeIds) * erasure_coding.TotalShardsCount + if totalFreeEcSlots < 1 { + // No capacity at all on the target disk type + if diskType != types.HardDriveType { + return fmt.Errorf("no free ec shard slots on disk type '%s'. The target disk type has no capacity.\n"+ + "Your volumes are likely on a different disk type. Try:\n"+ + " ec.encode -collection=%s -diskType=hdd\n"+ + "Or omit -diskType to use the default (hdd)", diskType, *collection) + } + return fmt.Errorf("no free ec shard slots. only %d left on disk type '%s'", totalFreeEcSlots, diskType) + } + + if totalFreeEcSlots < requiredSlots { + fmt.Printf("Warning: limited EC shard capacity. Need %d slots for %d volumes, but only %d slots available on disk type '%s'.\n", + requiredSlots, len(volumeIds), totalFreeEcSlots, diskType) + fmt.Printf("Rebalancing may not achieve optimal distribution.\n") + } + // encode all requested volumes... if err = doEcEncode(commandEnv, writer, volumeIdToCollection, volumeIds, *maxParallelization); err != nil { return fmt.Errorf("ec encode for volumes %v: %w", volumeIds, err) diff --git a/weed/shell/command_ec_test.go b/weed/shell/command_ec_test.go index 7d7b59f8f..073c80ad4 100644 --- a/weed/shell/command_ec_test.go +++ b/weed/shell/command_ec_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/types" ) @@ -136,3 +137,151 @@ func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNod func (ecNode *EcNode) addEcVolumeAndShardsForTest(vid uint32, collection string, shardIds []uint32) *EcNode { return ecNode.addEcVolumeShards(needle.VolumeId(vid), collection, shardIds, types.HardDriveType) } + +// TestCommandEcBalanceEvenDataAndParityDistribution verifies that after balancing: +// 1. Data shards (0-9) are evenly distributed across racks (max 2 per rack for 6 racks) +// 2. Parity shards (10-13) are evenly distributed across racks (max 1 per rack for 6 racks) +func TestCommandEcBalanceEvenDataAndParityDistribution(t *testing.T) { + // Setup: All 14 shards start on rack1 (simulating fresh EC encode) + ecb := &ecBalancer{ + ecNodes: []*EcNode{ + // All shards initially on rack1/dn1 + newEcNode("dc1", "rack1", "dn1", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}), + // Empty nodes on other racks + newEcNode("dc1", "rack2", "dn2", 100), + newEcNode("dc1", "rack3", "dn3", 100), + newEcNode("dc1", "rack4", "dn4", 100), + newEcNode("dc1", "rack5", "dn5", 100), + newEcNode("dc1", "rack6", "dn6", 100), + }, + applyBalancing: false, // Dry-run mode (simulates moves by updating internal state) + diskType: types.HardDriveType, + } + + ecb.balanceEcVolumes("c1") + + // After balancing (dry-run), verify the PLANNED distribution by checking what moves were proposed + // The ecb.ecNodes state is updated during dry-run to track planned moves + vid := needle.VolumeId(1) + dataShardCount := erasure_coding.DataShardsCount // 10 + parityShardCount := erasure_coding.ParityShardsCount // 4 + + // Count data and parity shards per rack based on current (updated) state + dataPerRack, parityPerRack := countDataAndParityShardsPerRack(ecb.ecNodes, vid, dataShardCount) + + // With 6 racks: + // - Data shards (10): max 2 per rack (ceil(10/6) = 2) + // - Parity shards (4): max 1 per rack (ceil(4/6) = 1) + maxDataPerRack := ceilDivide(dataShardCount, 6) // 2 + maxParityPerRack := ceilDivide(parityShardCount, 6) // 1 + + // Verify no rack has more than max data shards + for rackId, count := range dataPerRack { + if count > maxDataPerRack { + t.Errorf("rack %s has %d data shards, expected max %d", rackId, count, maxDataPerRack) + } + } + + // Verify no rack has more than max parity shards + for rackId, count := range parityPerRack { + if count > maxParityPerRack { + t.Errorf("rack %s has %d parity shards, expected max %d", rackId, count, maxParityPerRack) + } + } + + // Verify all shards are distributed (total counts) + totalData := 0 + totalParity := 0 + for _, count := range dataPerRack { + totalData += count + } + for _, count := range parityPerRack { + totalParity += count + } + if totalData != dataShardCount { + t.Errorf("total data shards = %d, expected %d", totalData, dataShardCount) + } + if totalParity != parityShardCount { + t.Errorf("total parity shards = %d, expected %d", totalParity, parityShardCount) + } + + // Verify data shards are spread across at least 5 racks (10 shards / 2 max per rack) + racksWithData := len(dataPerRack) + minRacksForData := dataShardCount / maxDataPerRack // At least 5 racks needed for 10 data shards + if racksWithData < minRacksForData { + t.Errorf("data shards spread across only %d racks, expected at least %d", racksWithData, minRacksForData) + } + + // Verify parity shards are spread across at least 4 racks (4 shards / 1 max per rack) + racksWithParity := len(parityPerRack) + if racksWithParity < parityShardCount { + t.Errorf("parity shards spread across only %d racks, expected at least %d", racksWithParity, parityShardCount) + } + + t.Logf("Distribution after balancing:") + t.Logf(" Data shards per rack: %v (max allowed: %d)", dataPerRack, maxDataPerRack) + t.Logf(" Parity shards per rack: %v (max allowed: %d)", parityPerRack, maxParityPerRack) +} + +// countDataAndParityShardsPerRack counts data and parity shards per rack +func countDataAndParityShardsPerRack(ecNodes []*EcNode, vid needle.VolumeId, dataShardCount int) (dataPerRack, parityPerRack map[string]int) { + dataPerRack = make(map[string]int) + parityPerRack = make(map[string]int) + + for _, ecNode := range ecNodes { + shardBits := findEcVolumeShards(ecNode, vid, types.HardDriveType) + for _, shardId := range shardBits.ShardIds() { + rackId := string(ecNode.rack) + if int(shardId) < dataShardCount { + dataPerRack[rackId]++ + } else { + parityPerRack[rackId]++ + } + } + } + return +} + +// TestCommandEcBalanceMultipleVolumesEvenDistribution tests that multiple volumes +// each get their data and parity shards evenly distributed +func TestCommandEcBalanceMultipleVolumesEvenDistribution(t *testing.T) { + // Setup: Two volumes, each with all 14 shards on different starting racks + ecb := &ecBalancer{ + ecNodes: []*EcNode{ + // Volume 1: all shards on rack1 + newEcNode("dc1", "rack1", "dn1", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}), + // Volume 2: all shards on rack2 + newEcNode("dc1", "rack2", "dn2", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}), + // Empty nodes on other racks + newEcNode("dc1", "rack3", "dn3", 100), + newEcNode("dc1", "rack4", "dn4", 100), + newEcNode("dc1", "rack5", "dn5", 100), + newEcNode("dc1", "rack6", "dn6", 100), + }, + applyBalancing: false, // Dry-run mode + diskType: types.HardDriveType, + } + + ecb.balanceEcVolumes("c1") + + // Check both volumes + for _, vid := range []needle.VolumeId{1, 2} { + dataPerRack, parityPerRack := countDataAndParityShardsPerRack(ecb.ecNodes, vid, erasure_coding.DataShardsCount) + + maxDataPerRack := ceilDivide(erasure_coding.DataShardsCount, 6) + maxParityPerRack := ceilDivide(erasure_coding.ParityShardsCount, 6) + + for rackId, count := range dataPerRack { + if count > maxDataPerRack { + t.Errorf("volume %d: rack %s has %d data shards, expected max %d", vid, rackId, count, maxDataPerRack) + } + } + for rackId, count := range parityPerRack { + if count > maxParityPerRack { + t.Errorf("volume %d: rack %s has %d parity shards, expected max %d", vid, rackId, count, maxParityPerRack) + } + } + + t.Logf("Volume %d - Data: %v, Parity: %v", vid, dataPerRack, parityPerRack) + } +} diff --git a/weed/shell/ec_proportional_rebalance.go b/weed/shell/ec_proportional_rebalance.go new file mode 100644 index 000000000..d01803001 --- /dev/null +++ b/weed/shell/ec_proportional_rebalance.go @@ -0,0 +1,284 @@ +package shell + +import ( + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/distribution" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/storage/types" +) + +// ECDistribution is an alias to the distribution package type for backward compatibility +type ECDistribution = distribution.ECDistribution + +// CalculateECDistribution computes the target EC shard distribution based on replication policy. +// This is a convenience wrapper that uses the default 10+4 EC configuration. +// For custom EC ratios, use the distribution package directly. +func CalculateECDistribution(totalShards, parityShards int, rp *super_block.ReplicaPlacement) *ECDistribution { + ec := distribution.ECConfig{ + DataShards: totalShards - parityShards, + ParityShards: parityShards, + } + rep := distribution.NewReplicationConfig(rp) + return distribution.CalculateDistribution(ec, rep) +} + +// TopologyDistributionAnalysis holds the current shard distribution analysis +// This wraps the distribution package's TopologyAnalysis with shell-specific EcNode handling +type TopologyDistributionAnalysis struct { + inner *distribution.TopologyAnalysis + + // Shell-specific mappings + nodeMap map[string]*EcNode // nodeID -> EcNode +} + +// NewTopologyDistributionAnalysis creates a new analysis structure +func NewTopologyDistributionAnalysis() *TopologyDistributionAnalysis { + return &TopologyDistributionAnalysis{ + inner: distribution.NewTopologyAnalysis(), + nodeMap: make(map[string]*EcNode), + } +} + +// AddNode adds a node and its shards to the analysis +func (a *TopologyDistributionAnalysis) AddNode(node *EcNode, shardBits erasure_coding.ShardBits) { + nodeId := node.info.Id + + // Create distribution.TopologyNode from EcNode + topoNode := &distribution.TopologyNode{ + NodeID: nodeId, + DataCenter: string(node.dc), + Rack: string(node.rack), + FreeSlots: node.freeEcSlot, + TotalShards: shardBits.ShardIdCount(), + } + + for _, shardId := range shardBits.ShardIds() { + topoNode.ShardIDs = append(topoNode.ShardIDs, int(shardId)) + } + + a.inner.AddNode(topoNode) + a.nodeMap[nodeId] = node + + // Add shard locations + for _, shardId := range shardBits.ShardIds() { + a.inner.AddShardLocation(distribution.ShardLocation{ + ShardID: int(shardId), + NodeID: nodeId, + DataCenter: string(node.dc), + Rack: string(node.rack), + }) + } +} + +// Finalize completes the analysis +func (a *TopologyDistributionAnalysis) Finalize() { + a.inner.Finalize() +} + +// String returns a summary +func (a *TopologyDistributionAnalysis) String() string { + return a.inner.String() +} + +// DetailedString returns detailed analysis +func (a *TopologyDistributionAnalysis) DetailedString() string { + return a.inner.DetailedString() +} + +// GetShardsByDC returns shard counts by DC +func (a *TopologyDistributionAnalysis) GetShardsByDC() map[DataCenterId]int { + result := make(map[DataCenterId]int) + for dc, count := range a.inner.ShardsByDC { + result[DataCenterId(dc)] = count + } + return result +} + +// GetShardsByRack returns shard counts by rack +func (a *TopologyDistributionAnalysis) GetShardsByRack() map[RackId]int { + result := make(map[RackId]int) + for rack, count := range a.inner.ShardsByRack { + result[RackId(rack)] = count + } + return result +} + +// GetShardsByNode returns shard counts by node +func (a *TopologyDistributionAnalysis) GetShardsByNode() map[EcNodeId]int { + result := make(map[EcNodeId]int) + for nodeId, count := range a.inner.ShardsByNode { + result[EcNodeId(nodeId)] = count + } + return result +} + +// AnalyzeVolumeDistribution creates an analysis of current shard distribution for a volume +func AnalyzeVolumeDistribution(volumeId needle.VolumeId, locations []*EcNode, diskType types.DiskType) *TopologyDistributionAnalysis { + analysis := NewTopologyDistributionAnalysis() + + for _, node := range locations { + shardBits := findEcVolumeShards(node, volumeId, diskType) + if shardBits.ShardIdCount() > 0 { + analysis.AddNode(node, shardBits) + } + } + + analysis.Finalize() + return analysis +} + +// ECShardMove represents a planned shard move (shell-specific with EcNode references) +type ECShardMove struct { + VolumeId needle.VolumeId + ShardId erasure_coding.ShardId + SourceNode *EcNode + DestNode *EcNode + Reason string +} + +// String returns a human-readable description +func (m ECShardMove) String() string { + return fmt.Sprintf("volume %d shard %d: %s -> %s (%s)", + m.VolumeId, m.ShardId, m.SourceNode.info.Id, m.DestNode.info.Id, m.Reason) +} + +// ProportionalECRebalancer implements proportional shard distribution for shell commands +type ProportionalECRebalancer struct { + ecNodes []*EcNode + replicaPlacement *super_block.ReplicaPlacement + diskType types.DiskType + ecConfig distribution.ECConfig +} + +// NewProportionalECRebalancer creates a new proportional rebalancer with default EC config +func NewProportionalECRebalancer( + ecNodes []*EcNode, + rp *super_block.ReplicaPlacement, + diskType types.DiskType, +) *ProportionalECRebalancer { + return NewProportionalECRebalancerWithConfig( + ecNodes, + rp, + diskType, + distribution.DefaultECConfig(), + ) +} + +// NewProportionalECRebalancerWithConfig creates a rebalancer with custom EC configuration +func NewProportionalECRebalancerWithConfig( + ecNodes []*EcNode, + rp *super_block.ReplicaPlacement, + diskType types.DiskType, + ecConfig distribution.ECConfig, +) *ProportionalECRebalancer { + return &ProportionalECRebalancer{ + ecNodes: ecNodes, + replicaPlacement: rp, + diskType: diskType, + ecConfig: ecConfig, + } +} + +// PlanMoves generates a plan for moving shards to achieve proportional distribution +func (r *ProportionalECRebalancer) PlanMoves( + volumeId needle.VolumeId, + locations []*EcNode, +) ([]ECShardMove, error) { + // Build topology analysis + analysis := distribution.NewTopologyAnalysis() + nodeMap := make(map[string]*EcNode) + + // Add all EC nodes to the analysis (even those without shards) + for _, node := range r.ecNodes { + nodeId := node.info.Id + topoNode := &distribution.TopologyNode{ + NodeID: nodeId, + DataCenter: string(node.dc), + Rack: string(node.rack), + FreeSlots: node.freeEcSlot, + } + analysis.AddNode(topoNode) + nodeMap[nodeId] = node + } + + // Add shard locations from nodes that have shards + for _, node := range locations { + nodeId := node.info.Id + shardBits := findEcVolumeShards(node, volumeId, r.diskType) + for _, shardId := range shardBits.ShardIds() { + analysis.AddShardLocation(distribution.ShardLocation{ + ShardID: int(shardId), + NodeID: nodeId, + DataCenter: string(node.dc), + Rack: string(node.rack), + }) + } + if _, exists := nodeMap[nodeId]; !exists { + nodeMap[nodeId] = node + } + } + + analysis.Finalize() + + // Create rebalancer and plan moves + rep := distribution.NewReplicationConfig(r.replicaPlacement) + rebalancer := distribution.NewRebalancer(r.ecConfig, rep) + + plan, err := rebalancer.PlanRebalance(analysis) + if err != nil { + return nil, err + } + + // Convert distribution moves to shell moves + var moves []ECShardMove + for _, move := range plan.Moves { + srcNode := nodeMap[move.SourceNode.NodeID] + destNode := nodeMap[move.DestNode.NodeID] + if srcNode == nil || destNode == nil { + continue + } + + moves = append(moves, ECShardMove{ + VolumeId: volumeId, + ShardId: erasure_coding.ShardId(move.ShardID), + SourceNode: srcNode, + DestNode: destNode, + Reason: move.Reason, + }) + } + + return moves, nil +} + +// GetDistributionSummary returns a summary of the planned distribution +func GetDistributionSummary(rp *super_block.ReplicaPlacement) string { + ec := distribution.DefaultECConfig() + rep := distribution.NewReplicationConfig(rp) + dist := distribution.CalculateDistribution(ec, rep) + return dist.Summary() +} + +// GetDistributionSummaryWithConfig returns a summary with custom EC configuration +func GetDistributionSummaryWithConfig(rp *super_block.ReplicaPlacement, ecConfig distribution.ECConfig) string { + rep := distribution.NewReplicationConfig(rp) + dist := distribution.CalculateDistribution(ecConfig, rep) + return dist.Summary() +} + +// GetFaultToleranceAnalysis returns fault tolerance analysis for the given configuration +func GetFaultToleranceAnalysis(rp *super_block.ReplicaPlacement) string { + ec := distribution.DefaultECConfig() + rep := distribution.NewReplicationConfig(rp) + dist := distribution.CalculateDistribution(ec, rep) + return dist.FaultToleranceAnalysis() +} + +// GetFaultToleranceAnalysisWithConfig returns fault tolerance analysis with custom EC configuration +func GetFaultToleranceAnalysisWithConfig(rp *super_block.ReplicaPlacement, ecConfig distribution.ECConfig) string { + rep := distribution.NewReplicationConfig(rp) + dist := distribution.CalculateDistribution(ecConfig, rep) + return dist.FaultToleranceAnalysis() +} diff --git a/weed/shell/ec_proportional_rebalance_test.go b/weed/shell/ec_proportional_rebalance_test.go new file mode 100644 index 000000000..c8ec99e0a --- /dev/null +++ b/weed/shell/ec_proportional_rebalance_test.go @@ -0,0 +1,251 @@ +package shell + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/distribution" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/storage/types" +) + +func TestCalculateECDistributionShell(t *testing.T) { + // Test the shell wrapper function + rp, _ := super_block.NewReplicaPlacementFromString("110") + + dist := CalculateECDistribution( + erasure_coding.TotalShardsCount, + erasure_coding.ParityShardsCount, + rp, + ) + + if dist.ReplicationConfig.MinDataCenters != 2 { + t.Errorf("Expected 2 DCs, got %d", dist.ReplicationConfig.MinDataCenters) + } + if dist.TargetShardsPerDC != 7 { + t.Errorf("Expected 7 shards per DC, got %d", dist.TargetShardsPerDC) + } + + t.Log(dist.Summary()) +} + +func TestAnalyzeVolumeDistributionShell(t *testing.T) { + diskType := types.HardDriveType + diskTypeKey := string(diskType) + + // Build a topology with unbalanced distribution + node1 := &EcNode{ + info: &master_pb.DataNodeInfo{ + Id: "127.0.0.1:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + diskTypeKey: { + Type: diskTypeKey, + MaxVolumeCount: 10, + EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{ + { + Id: 1, + Collection: "test", + EcIndexBits: 0x3FFF, // All 14 shards + }, + }, + }, + }, + }, + dc: "dc1", + rack: "rack1", + freeEcSlot: 5, + } + + node2 := &EcNode{ + info: &master_pb.DataNodeInfo{ + Id: "127.0.0.1:8081", + DiskInfos: map[string]*master_pb.DiskInfo{ + diskTypeKey: { + Type: diskTypeKey, + MaxVolumeCount: 10, + EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{}, + }, + }, + }, + dc: "dc2", + rack: "rack2", + freeEcSlot: 10, + } + + locations := []*EcNode{node1, node2} + volumeId := needle.VolumeId(1) + + analysis := AnalyzeVolumeDistribution(volumeId, locations, diskType) + + shardsByDC := analysis.GetShardsByDC() + if shardsByDC["dc1"] != 14 { + t.Errorf("Expected 14 shards in dc1, got %d", shardsByDC["dc1"]) + } + + t.Log(analysis.DetailedString()) +} + +func TestProportionalRebalancerShell(t *testing.T) { + diskType := types.HardDriveType + diskTypeKey := string(diskType) + + // Build topology: 2 DCs, 2 racks each, all shards on one node + nodes := []*EcNode{ + { + info: &master_pb.DataNodeInfo{ + Id: "dc1-rack1-node1", + DiskInfos: map[string]*master_pb.DiskInfo{ + diskTypeKey: { + Type: diskTypeKey, + MaxVolumeCount: 10, + EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{ + {Id: 1, Collection: "test", EcIndexBits: 0x3FFF}, + }, + }, + }, + }, + dc: "dc1", rack: "dc1-rack1", freeEcSlot: 0, + }, + { + info: &master_pb.DataNodeInfo{ + Id: "dc1-rack2-node1", + DiskInfos: map[string]*master_pb.DiskInfo{ + diskTypeKey: {Type: diskTypeKey, MaxVolumeCount: 10}, + }, + }, + dc: "dc1", rack: "dc1-rack2", freeEcSlot: 10, + }, + { + info: &master_pb.DataNodeInfo{ + Id: "dc2-rack1-node1", + DiskInfos: map[string]*master_pb.DiskInfo{ + diskTypeKey: {Type: diskTypeKey, MaxVolumeCount: 10}, + }, + }, + dc: "dc2", rack: "dc2-rack1", freeEcSlot: 10, + }, + { + info: &master_pb.DataNodeInfo{ + Id: "dc2-rack2-node1", + DiskInfos: map[string]*master_pb.DiskInfo{ + diskTypeKey: {Type: diskTypeKey, MaxVolumeCount: 10}, + }, + }, + dc: "dc2", rack: "dc2-rack2", freeEcSlot: 10, + }, + } + + rp, _ := super_block.NewReplicaPlacementFromString("110") + rebalancer := NewProportionalECRebalancer(nodes, rp, diskType) + + volumeId := needle.VolumeId(1) + moves, err := rebalancer.PlanMoves(volumeId, []*EcNode{nodes[0]}) + + if err != nil { + t.Fatalf("PlanMoves failed: %v", err) + } + + t.Logf("Planned %d moves", len(moves)) + for i, move := range moves { + t.Logf(" %d. %s", i+1, move.String()) + } + + // Verify moves to dc2 + movedToDC2 := 0 + for _, move := range moves { + if move.DestNode.dc == "dc2" { + movedToDC2++ + } + } + + if movedToDC2 == 0 { + t.Error("Expected some moves to dc2") + } +} + +func TestCustomECConfigRebalancer(t *testing.T) { + diskType := types.HardDriveType + diskTypeKey := string(diskType) + + // Test with custom 8+4 EC configuration + ecConfig, err := distribution.NewECConfig(8, 4) + if err != nil { + t.Fatalf("Failed to create EC config: %v", err) + } + + // Build topology for 12 shards (8+4) + nodes := []*EcNode{ + { + info: &master_pb.DataNodeInfo{ + Id: "dc1-node1", + DiskInfos: map[string]*master_pb.DiskInfo{ + diskTypeKey: { + Type: diskTypeKey, + MaxVolumeCount: 10, + EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{ + {Id: 1, Collection: "test", EcIndexBits: 0x0FFF}, // 12 shards (bits 0-11) + }, + }, + }, + }, + dc: "dc1", rack: "dc1-rack1", freeEcSlot: 0, + }, + { + info: &master_pb.DataNodeInfo{ + Id: "dc2-node1", + DiskInfos: map[string]*master_pb.DiskInfo{ + diskTypeKey: {Type: diskTypeKey, MaxVolumeCount: 10}, + }, + }, + dc: "dc2", rack: "dc2-rack1", freeEcSlot: 10, + }, + { + info: &master_pb.DataNodeInfo{ + Id: "dc3-node1", + DiskInfos: map[string]*master_pb.DiskInfo{ + diskTypeKey: {Type: diskTypeKey, MaxVolumeCount: 10}, + }, + }, + dc: "dc3", rack: "dc3-rack1", freeEcSlot: 10, + }, + } + + rp, _ := super_block.NewReplicaPlacementFromString("200") // 3 DCs + rebalancer := NewProportionalECRebalancerWithConfig(nodes, rp, diskType, ecConfig) + + volumeId := needle.VolumeId(1) + moves, err := rebalancer.PlanMoves(volumeId, []*EcNode{nodes[0]}) + + if err != nil { + t.Fatalf("PlanMoves failed: %v", err) + } + + t.Logf("Custom 8+4 EC with 200 replication: planned %d moves", len(moves)) + + // Get the distribution summary + summary := GetDistributionSummaryWithConfig(rp, ecConfig) + t.Log(summary) + + analysis := GetFaultToleranceAnalysisWithConfig(rp, ecConfig) + t.Log(analysis) +} + +func TestGetDistributionSummaryShell(t *testing.T) { + rp, _ := super_block.NewReplicaPlacementFromString("110") + + summary := GetDistributionSummary(rp) + t.Log(summary) + + if len(summary) == 0 { + t.Error("Summary should not be empty") + } + + analysis := GetFaultToleranceAnalysis(rp) + t.Log(analysis) + + if len(analysis) == 0 { + t.Error("Analysis should not be empty") + } +} diff --git a/weed/shell/ec_rebalance_slots_test.go b/weed/shell/ec_rebalance_slots_test.go new file mode 100644 index 000000000..4a55c9bce --- /dev/null +++ b/weed/shell/ec_rebalance_slots_test.go @@ -0,0 +1,293 @@ +package shell + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/types" +) + +// TestECRebalanceWithLimitedSlots tests that EC rebalance handles the scenario +// where there are limited free slots on volume servers. +// +// This is a regression test for the error: +// +// "no free ec shard slots. only 0 left" +// +// Scenario (from real usage): +// - 6 volume servers in 6 racks +// - Each server has max=10 volume slots +// - 7 volumes were EC encoded (7 × 14 = 98 EC shards) +// - All 14 shards per volume are on the original server (not yet distributed) +// +// Expected behavior: +// - The rebalance algorithm should distribute shards across servers +// - Even if perfect distribution isn't possible, it should do best-effort +// - Currently fails with "no free ec shard slots" because freeSlots calculation +// +// doesn't account for shards being moved (freed slots on source, used on target) +func TestECRebalanceWithLimitedSlots(t *testing.T) { + // Build a topology matching the problematic scenario: + // 6 servers, each with 2+ volumes worth of EC shards (all 14 shards per volume on same server) + topology := buildLimitedSlotsTopology() + + // Collect EC nodes from the topology + ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topology, "", types.HardDriveType) + + t.Logf("Topology summary:") + t.Logf(" Number of EC nodes: %d", len(ecNodes)) + t.Logf(" Total free EC slots: %d", totalFreeEcSlots) + + // Log per-node details + for _, node := range ecNodes { + shardCount := 0 + for _, diskInfo := range node.info.DiskInfos { + for _, ecShard := range diskInfo.EcShardInfos { + shardCount += erasure_coding.ShardBits(ecShard.EcIndexBits).ShardIdCount() + } + } + t.Logf(" Node %s (rack %s): %d shards, %d free slots", + node.info.Id, node.rack, shardCount, node.freeEcSlot) + } + + // Calculate total EC shards + totalEcShards := 0 + for _, node := range ecNodes { + for _, diskInfo := range node.info.DiskInfos { + for _, ecShard := range diskInfo.EcShardInfos { + totalEcShards += erasure_coding.ShardBits(ecShard.EcIndexBits).ShardIdCount() + } + } + } + t.Logf(" Total EC shards: %d", totalEcShards) + + // Document the issue: + // With 98 EC shards (7 volumes × 14 shards) on 6 servers with max=10 each, + // total capacity is 60 slots. But shards already occupy slots on their current servers. + // + // The current algorithm calculates free slots as: + // freeSlots = maxVolumeCount - volumeCount - ecShardCount + // + // If all shards are on their original servers: + // - Server A has 28 shards (2 volumes × 14) → may have negative free slots + // - This causes totalFreeEcSlots to be 0 or negative + // + // The EXPECTED improvement: + // - Rebalance should recognize that moving a shard FREES a slot on the source + // - The algorithm should work iteratively, moving shards one at a time + // - Even if starting with 0 free slots, moving one shard opens a slot + + if totalFreeEcSlots < 1 { + // This is the current (buggy) behavior we're documenting + t.Logf("") + t.Logf("KNOWN ISSUE: totalFreeEcSlots = %d (< 1)", totalFreeEcSlots) + t.Logf("") + t.Logf("This triggers the error: 'no free ec shard slots. only %d left'", totalFreeEcSlots) + t.Logf("") + t.Logf("Analysis:") + t.Logf(" - %d EC shards across %d servers", totalEcShards, len(ecNodes)) + t.Logf(" - Shards are concentrated on original servers (not distributed)") + t.Logf(" - Current slot calculation doesn't account for slots freed by moving shards") + t.Logf("") + t.Logf("Expected fix:") + t.Logf(" 1. Rebalance should work iteratively, moving one shard at a time") + t.Logf(" 2. Moving a shard from A to B: frees 1 slot on A, uses 1 slot on B") + t.Logf(" 3. The 'free slots' check should be per-move, not global") + t.Logf(" 4. Or: calculate 'redistributable slots' = total capacity - shards that must stay") + + // For now, document this is a known issue - don't fail the test + // When the fix is implemented, this test should be updated to verify the fix works + return + } + + // If we get here, the issue might have been fixed + t.Logf("totalFreeEcSlots = %d, rebalance should be possible", totalFreeEcSlots) +} + +// TestECRebalanceZeroFreeSlots tests the specific scenario where +// the topology appears to have free slots but rebalance fails. +// +// This can happen when the VolumeCount in the topology includes the original +// volumes that were EC-encoded, making the free slot calculation incorrect. +func TestECRebalanceZeroFreeSlots(t *testing.T) { + // Build a topology where volumes were NOT deleted after EC encoding + // (VolumeCount still reflects the original volumes) + topology := buildZeroFreeSlotTopology() + + ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topology, "", types.HardDriveType) + + t.Logf("Zero free slots scenario:") + for _, node := range ecNodes { + shardCount := 0 + for _, diskInfo := range node.info.DiskInfos { + for _, ecShard := range diskInfo.EcShardInfos { + shardCount += erasure_coding.ShardBits(ecShard.EcIndexBits).ShardIdCount() + } + } + t.Logf(" Node %s: %d shards, %d free slots, volumeCount=%d, max=%d", + node.info.Id, shardCount, node.freeEcSlot, + node.info.DiskInfos[string(types.HardDriveType)].VolumeCount, + node.info.DiskInfos[string(types.HardDriveType)].MaxVolumeCount) + } + t.Logf(" Total free slots: %d", totalFreeEcSlots) + + if totalFreeEcSlots == 0 { + t.Logf("") + t.Logf("SCENARIO REPRODUCED: totalFreeEcSlots = 0") + t.Logf("This would trigger: 'no free ec shard slots. only 0 left'") + } +} + +// buildZeroFreeSlotTopology creates a topology where rebalance will fail +// because servers are at capacity (volumeCount equals maxVolumeCount) +func buildZeroFreeSlotTopology() *master_pb.TopologyInfo { + diskTypeKey := string(types.HardDriveType) + + // Each server has max=10, volumeCount=10 (full capacity) + // Free capacity = (10-10) * 10 = 0 per server + // This will trigger "no free ec shard slots" error + return &master_pb.TopologyInfo{ + Id: "test_zero_free_slots", + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack0", + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + Id: "127.0.0.1:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + diskTypeKey: { + Type: diskTypeKey, + MaxVolumeCount: 10, + VolumeCount: 10, // At full capacity + EcShardInfos: buildEcShards([]uint32{3, 4}), + }, + }, + }, + }, + }, + { + Id: "rack1", + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + Id: "127.0.0.1:8081", + DiskInfos: map[string]*master_pb.DiskInfo{ + diskTypeKey: { + Type: diskTypeKey, + MaxVolumeCount: 10, + VolumeCount: 10, + EcShardInfos: buildEcShards([]uint32{1, 7}), + }, + }, + }, + }, + }, + { + Id: "rack2", + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + Id: "127.0.0.1:8082", + DiskInfos: map[string]*master_pb.DiskInfo{ + diskTypeKey: { + Type: diskTypeKey, + MaxVolumeCount: 10, + VolumeCount: 10, + EcShardInfos: buildEcShards([]uint32{2}), + }, + }, + }, + }, + }, + { + Id: "rack3", + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + Id: "127.0.0.1:8083", + DiskInfos: map[string]*master_pb.DiskInfo{ + diskTypeKey: { + Type: diskTypeKey, + MaxVolumeCount: 10, + VolumeCount: 10, + EcShardInfos: buildEcShards([]uint32{5, 6}), + }, + }, + }, + }, + }, + }, + }, + }, + } +} + +func buildEcShards(volumeIds []uint32) []*master_pb.VolumeEcShardInformationMessage { + var shards []*master_pb.VolumeEcShardInformationMessage + for _, vid := range volumeIds { + allShardBits := erasure_coding.ShardBits(0) + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + allShardBits = allShardBits.AddShardId(erasure_coding.ShardId(i)) + } + shards = append(shards, &master_pb.VolumeEcShardInformationMessage{ + Id: vid, + Collection: "ectest", + EcIndexBits: uint32(allShardBits), + }) + } + return shards +} + +// buildLimitedSlotsTopology creates a topology matching the problematic scenario: +// - 6 servers in 6 racks +// - Each server has max=10 volume slots +// - 7 volumes were EC encoded, shards distributed as follows: +// - rack0 (8080): volumes 3,4 → 28 shards +// - rack1 (8081): volumes 1,7 → 28 shards +// - rack2 (8082): volume 2 → 14 shards +// - rack3 (8083): volumes 5,6 → 28 shards +// - rack4 (8084): (no volumes originally) +// - rack5 (8085): (no volumes originally) +func buildLimitedSlotsTopology() *master_pb.TopologyInfo { + return &master_pb.TopologyInfo{ + Id: "test_limited_slots", + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + buildRackWithEcShards("rack0", "127.0.0.1:8080", 10, []uint32{3, 4}), + buildRackWithEcShards("rack1", "127.0.0.1:8081", 10, []uint32{1, 7}), + buildRackWithEcShards("rack2", "127.0.0.1:8082", 10, []uint32{2}), + buildRackWithEcShards("rack3", "127.0.0.1:8083", 10, []uint32{5, 6}), + buildRackWithEcShards("rack4", "127.0.0.1:8084", 10, []uint32{}), + buildRackWithEcShards("rack5", "127.0.0.1:8085", 10, []uint32{}), + }, + }, + }, + } +} + +// buildRackWithEcShards creates a rack with one data node containing EC shards +// for the specified volume IDs (all 14 shards per volume) +func buildRackWithEcShards(rackId, nodeId string, maxVolumes int64, volumeIds []uint32) *master_pb.RackInfo { + // Note: types.HardDriveType is "" (empty string), so we use "" as the key + diskTypeKey := string(types.HardDriveType) + + return &master_pb.RackInfo{ + Id: rackId, + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + Id: nodeId, + DiskInfos: map[string]*master_pb.DiskInfo{ + diskTypeKey: { + Type: diskTypeKey, + MaxVolumeCount: maxVolumes, + VolumeCount: int64(len(volumeIds)), // Original volumes still counted + EcShardInfos: buildEcShards(volumeIds), + }, + }, + }, + }, + } +} diff --git a/weed/storage/erasure_coding/distribution/README.md b/weed/storage/erasure_coding/distribution/README.md new file mode 100644 index 000000000..91cfd8fe8 --- /dev/null +++ b/weed/storage/erasure_coding/distribution/README.md @@ -0,0 +1,209 @@ +# EC Distribution Package + +This package provides erasure coding (EC) shard distribution algorithms that are: + +- **Configurable**: Works with any EC ratio (e.g., 10+4, 8+4, 6+3) +- **Reusable**: Used by shell commands, worker tasks, and seaweed-enterprise +- **Topology-aware**: Distributes shards across data centers, racks, and nodes proportionally + +## Usage + +### Basic Usage with Default 10+4 EC + +```go +import ( + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/distribution" +) + +// Parse replication policy +rep, _ := distribution.NewReplicationConfigFromString("110") + +// Use default 10+4 EC configuration +ec := distribution.DefaultECConfig() + +// Calculate distribution plan +dist := distribution.CalculateDistribution(ec, rep) + +fmt.Println(dist.Summary()) +// Output: +// EC Configuration: 10+4 (total: 14, can lose: 4) +// Replication: replication=110 (DCs:2, Racks/DC:2, Nodes/Rack:1) +// Distribution Plan: +// Data Centers: 2 (target 7 shards each, max 9) +// Racks per DC: 2 (target 4 shards each, max 6) +// Nodes per Rack: 1 (target 4 shards each, max 6) +``` + +### Custom EC Ratios (seaweed-enterprise) + +```go +// Create custom 8+4 EC configuration +ec, err := distribution.NewECConfig(8, 4) +if err != nil { + log.Fatal(err) +} + +rep, _ := distribution.NewReplicationConfigFromString("200") +dist := distribution.CalculateDistribution(ec, rep) + +// Check fault tolerance +fmt.Println(dist.FaultToleranceAnalysis()) +// Output: +// Fault Tolerance Analysis for 8+4: +// DC Failure: SURVIVABLE ✓ +// - Losing one DC loses ~4 shards +// - Remaining: 8 shards (need 8) +``` + +### Planning Shard Moves + +```go +// Build topology analysis +analysis := distribution.NewTopologyAnalysis() + +// Add nodes and their shard locations +for _, node := range nodes { + analysis.AddNode(&distribution.TopologyNode{ + NodeID: node.ID, + DataCenter: node.DC, + Rack: node.Rack, + FreeSlots: node.FreeSlots, + }) + for _, shardID := range node.ShardIDs { + analysis.AddShardLocation(distribution.ShardLocation{ + ShardID: shardID, + NodeID: node.ID, + DataCenter: node.DC, + Rack: node.Rack, + }) + } +} +analysis.Finalize() + +// Create rebalancer and plan moves +rebalancer := distribution.NewRebalancer(ec, rep) +plan, err := rebalancer.PlanRebalance(analysis) + +for _, move := range plan.Moves { + fmt.Printf("Move shard %d from %s to %s\n", + move.ShardID, move.SourceNode.NodeID, move.DestNode.NodeID) +} +``` + +## Algorithm + +### Proportional Distribution + +The replication policy `XYZ` is interpreted as a ratio: + +| Replication | DCs | Racks/DC | Nodes/Rack | 14 Shards Distribution | +|-------------|-----|----------|------------|------------------------| +| `000` | 1 | 1 | 1 | All in one place | +| `001` | 1 | 1 | 2 | 7 per node | +| `010` | 1 | 2 | 1 | 7 per rack | +| `100` | 2 | 1 | 1 | 7 per DC | +| `110` | 2 | 2 | 1 | 7/DC, 4/rack | +| `200` | 3 | 1 | 1 | 5 per DC | + +### Rebalancing Process + +1. **DC-level balancing**: Move shards to achieve target shards per DC +2. **Rack-level balancing**: Within each DC, balance across racks +3. **Node-level balancing**: Within each rack, balance across nodes + +### Shard Priority: Data First, Parity Moves First + +When rebalancing, the algorithm prioritizes keeping data shards spread out: + +- **Data shards (0 to DataShards-1)**: Serve read requests directly +- **Parity shards (DataShards to TotalShards-1)**: Only used for reconstruction + +**Rebalancing Strategy**: +- When moving shards FROM an overloaded node, **parity shards are moved first** +- This keeps data shards in place on well-distributed nodes +- Result: Data shards remain spread out for optimal read performance + +```go +// Check shard type +if ec.IsDataShard(shardID) { + // Shard serves read requests +} +if ec.IsParityShard(shardID) { + // Shard only used for reconstruction +} + +// Sort shards for placement (data first for initial distribution) +sorted := ec.SortShardsDataFirst(shards) + +// Sort shards for rebalancing (parity first to move them away) +sorted := ec.SortShardsParityFirst(shards) +``` + +### Fault Tolerance + +The package provides fault tolerance analysis: + +- **DC Failure**: Can the data survive complete DC loss? +- **Rack Failure**: Can the data survive complete rack loss? +- **Node Failure**: Can the data survive single node loss? + +For example, with 10+4 EC (can lose 4 shards): +- Need 4+ DCs for DC-level fault tolerance +- Need 4+ racks for rack-level fault tolerance +- Usually survivable at node level + +## API Reference + +### Types + +- `ECConfig`: EC configuration (data shards, parity shards) +- `ReplicationConfig`: Parsed replication policy +- `ECDistribution`: Calculated distribution plan +- `TopologyAnalysis`: Current shard distribution analysis +- `Rebalancer`: Plans shard moves +- `RebalancePlan`: List of planned moves +- `ShardMove`: Single shard move operation + +### Key Functions + +- `NewECConfig(data, parity int)`: Create EC configuration +- `DefaultECConfig()`: Returns 10+4 configuration +- `CalculateDistribution(ec, rep)`: Calculate distribution plan +- `NewRebalancer(ec, rep)`: Create rebalancer +- `PlanRebalance(analysis)`: Generate rebalancing plan + +## Integration + +### Shell Commands + +The shell package wraps this distribution package for `ec.balance`: + +```go +import "github.com/seaweedfs/seaweedfs/weed/shell" + +rebalancer := shell.NewProportionalECRebalancer(nodes, rp, diskType) +moves, _ := rebalancer.PlanMoves(volumeId, locations) +``` + +### Worker Tasks + +Worker tasks can use the distribution package directly: + +```go +import "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/distribution" + +ec := distribution.ECConfig{DataShards: 8, ParityShards: 4} +rep := distribution.NewReplicationConfig(rp) +dist := distribution.CalculateDistribution(ec, rep) +``` + +### seaweed-enterprise + +Enterprise features can provide custom EC configurations: + +```go +// Custom EC ratio from license/config +ec, _ := distribution.NewECConfig(customData, customParity) +rebalancer := distribution.NewRebalancer(ec, rep) +``` + diff --git a/weed/storage/erasure_coding/distribution/analysis.go b/weed/storage/erasure_coding/distribution/analysis.go new file mode 100644 index 000000000..6b1f80053 --- /dev/null +++ b/weed/storage/erasure_coding/distribution/analysis.go @@ -0,0 +1,241 @@ +package distribution + +import ( + "fmt" + "slices" +) + +// ShardLocation represents where a shard is located in the topology +type ShardLocation struct { + ShardID int + NodeID string + DataCenter string + Rack string +} + +// TopologyNode represents a node in the topology that can hold EC shards +type TopologyNode struct { + NodeID string + DataCenter string + Rack string + FreeSlots int // Available slots for new shards + ShardIDs []int // Shard IDs currently on this node for a specific volume + TotalShards int // Total shards on this node (for all volumes) +} + +// TopologyAnalysis holds the current shard distribution analysis for a volume +type TopologyAnalysis struct { + // Shard counts at each level + ShardsByDC map[string]int + ShardsByRack map[string]int + ShardsByNode map[string]int + + // Detailed shard locations + DCToShards map[string][]int // DC -> list of shard IDs + RackToShards map[string][]int // Rack -> list of shard IDs + NodeToShards map[string][]int // NodeID -> list of shard IDs + + // Topology structure + DCToRacks map[string][]string // DC -> list of rack IDs + RackToNodes map[string][]*TopologyNode // Rack -> list of nodes + AllNodes map[string]*TopologyNode // NodeID -> node info + + // Statistics + TotalShards int + TotalNodes int + TotalRacks int + TotalDCs int +} + +// NewTopologyAnalysis creates a new empty analysis +func NewTopologyAnalysis() *TopologyAnalysis { + return &TopologyAnalysis{ + ShardsByDC: make(map[string]int), + ShardsByRack: make(map[string]int), + ShardsByNode: make(map[string]int), + DCToShards: make(map[string][]int), + RackToShards: make(map[string][]int), + NodeToShards: make(map[string][]int), + DCToRacks: make(map[string][]string), + RackToNodes: make(map[string][]*TopologyNode), + AllNodes: make(map[string]*TopologyNode), + } +} + +// AddShardLocation adds a shard location to the analysis +func (a *TopologyAnalysis) AddShardLocation(loc ShardLocation) { + // Update counts + a.ShardsByDC[loc.DataCenter]++ + a.ShardsByRack[loc.Rack]++ + a.ShardsByNode[loc.NodeID]++ + + // Update shard lists + a.DCToShards[loc.DataCenter] = append(a.DCToShards[loc.DataCenter], loc.ShardID) + a.RackToShards[loc.Rack] = append(a.RackToShards[loc.Rack], loc.ShardID) + a.NodeToShards[loc.NodeID] = append(a.NodeToShards[loc.NodeID], loc.ShardID) + + a.TotalShards++ +} + +// AddNode adds a node to the topology (even if it has no shards) +func (a *TopologyAnalysis) AddNode(node *TopologyNode) { + if _, exists := a.AllNodes[node.NodeID]; exists { + return // Already added + } + + a.AllNodes[node.NodeID] = node + a.TotalNodes++ + + // Update topology structure + if !slices.Contains(a.DCToRacks[node.DataCenter], node.Rack) { + a.DCToRacks[node.DataCenter] = append(a.DCToRacks[node.DataCenter], node.Rack) + } + a.RackToNodes[node.Rack] = append(a.RackToNodes[node.Rack], node) + + // Update counts + if _, exists := a.ShardsByDC[node.DataCenter]; !exists { + a.TotalDCs++ + } + if _, exists := a.ShardsByRack[node.Rack]; !exists { + a.TotalRacks++ + } +} + +// Finalize computes final statistics after all data is added +func (a *TopologyAnalysis) Finalize() { + // Ensure we have accurate DC and rack counts + dcSet := make(map[string]bool) + rackSet := make(map[string]bool) + for _, node := range a.AllNodes { + dcSet[node.DataCenter] = true + rackSet[node.Rack] = true + } + a.TotalDCs = len(dcSet) + a.TotalRacks = len(rackSet) + a.TotalNodes = len(a.AllNodes) +} + +// String returns a summary of the analysis +func (a *TopologyAnalysis) String() string { + return fmt.Sprintf("TopologyAnalysis{shards:%d, nodes:%d, racks:%d, dcs:%d}", + a.TotalShards, a.TotalNodes, a.TotalRacks, a.TotalDCs) +} + +// DetailedString returns a detailed multi-line summary +func (a *TopologyAnalysis) DetailedString() string { + s := fmt.Sprintf("Topology Analysis:\n") + s += fmt.Sprintf(" Total Shards: %d\n", a.TotalShards) + s += fmt.Sprintf(" Data Centers: %d\n", a.TotalDCs) + for dc, count := range a.ShardsByDC { + s += fmt.Sprintf(" %s: %d shards\n", dc, count) + } + s += fmt.Sprintf(" Racks: %d\n", a.TotalRacks) + for rack, count := range a.ShardsByRack { + s += fmt.Sprintf(" %s: %d shards\n", rack, count) + } + s += fmt.Sprintf(" Nodes: %d\n", a.TotalNodes) + for nodeID, count := range a.ShardsByNode { + if count > 0 { + s += fmt.Sprintf(" %s: %d shards\n", nodeID, count) + } + } + return s +} + +// TopologyExcess represents a topology level (DC/rack/node) with excess shards +type TopologyExcess struct { + ID string // DC/rack/node ID + Level string // "dc", "rack", or "node" + Excess int // Number of excess shards (above target) + Shards []int // Shard IDs at this level + Nodes []*TopologyNode // Nodes at this level (for finding sources) +} + +// CalculateDCExcess returns DCs with more shards than the target +func CalculateDCExcess(analysis *TopologyAnalysis, dist *ECDistribution) []TopologyExcess { + var excess []TopologyExcess + + for dc, count := range analysis.ShardsByDC { + if count > dist.TargetShardsPerDC { + // Collect nodes in this DC + var nodes []*TopologyNode + for _, rack := range analysis.DCToRacks[dc] { + nodes = append(nodes, analysis.RackToNodes[rack]...) + } + excess = append(excess, TopologyExcess{ + ID: dc, + Level: "dc", + Excess: count - dist.TargetShardsPerDC, + Shards: analysis.DCToShards[dc], + Nodes: nodes, + }) + } + } + + // Sort by excess (most excess first) + slices.SortFunc(excess, func(a, b TopologyExcess) int { + return b.Excess - a.Excess + }) + + return excess +} + +// CalculateRackExcess returns racks with more shards than the target (within a DC) +func CalculateRackExcess(analysis *TopologyAnalysis, dc string, targetPerRack int) []TopologyExcess { + var excess []TopologyExcess + + for _, rack := range analysis.DCToRacks[dc] { + count := analysis.ShardsByRack[rack] + if count > targetPerRack { + excess = append(excess, TopologyExcess{ + ID: rack, + Level: "rack", + Excess: count - targetPerRack, + Shards: analysis.RackToShards[rack], + Nodes: analysis.RackToNodes[rack], + }) + } + } + + slices.SortFunc(excess, func(a, b TopologyExcess) int { + return b.Excess - a.Excess + }) + + return excess +} + +// CalculateUnderservedDCs returns DCs that have fewer shards than target +func CalculateUnderservedDCs(analysis *TopologyAnalysis, dist *ECDistribution) []string { + var underserved []string + + // Check existing DCs + for dc, count := range analysis.ShardsByDC { + if count < dist.TargetShardsPerDC { + underserved = append(underserved, dc) + } + } + + // Check DCs with nodes but no shards + for dc := range analysis.DCToRacks { + if _, exists := analysis.ShardsByDC[dc]; !exists { + underserved = append(underserved, dc) + } + } + + return underserved +} + +// CalculateUnderservedRacks returns racks that have fewer shards than target +func CalculateUnderservedRacks(analysis *TopologyAnalysis, dc string, targetPerRack int) []string { + var underserved []string + + for _, rack := range analysis.DCToRacks[dc] { + count := analysis.ShardsByRack[rack] + if count < targetPerRack { + underserved = append(underserved, rack) + } + } + + return underserved +} + diff --git a/weed/storage/erasure_coding/distribution/config.go b/weed/storage/erasure_coding/distribution/config.go new file mode 100644 index 000000000..e89d6eeb6 --- /dev/null +++ b/weed/storage/erasure_coding/distribution/config.go @@ -0,0 +1,171 @@ +// Package distribution provides EC shard distribution algorithms with configurable EC ratios. +package distribution + +import ( + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" +) + +// ECConfig holds erasure coding configuration parameters. +// This replaces hard-coded constants like DataShardsCount=10, ParityShardsCount=4. +type ECConfig struct { + DataShards int // Number of data shards (e.g., 10) + ParityShards int // Number of parity shards (e.g., 4) +} + +// DefaultECConfig returns the standard 10+4 EC configuration +func DefaultECConfig() ECConfig { + return ECConfig{ + DataShards: 10, + ParityShards: 4, + } +} + +// NewECConfig creates a new EC configuration with validation +func NewECConfig(dataShards, parityShards int) (ECConfig, error) { + if dataShards <= 0 { + return ECConfig{}, fmt.Errorf("dataShards must be positive, got %d", dataShards) + } + if parityShards <= 0 { + return ECConfig{}, fmt.Errorf("parityShards must be positive, got %d", parityShards) + } + if dataShards+parityShards > 32 { + return ECConfig{}, fmt.Errorf("total shards (%d+%d=%d) exceeds maximum of 32", + dataShards, parityShards, dataShards+parityShards) + } + return ECConfig{ + DataShards: dataShards, + ParityShards: parityShards, + }, nil +} + +// TotalShards returns the total number of shards (data + parity) +func (c ECConfig) TotalShards() int { + return c.DataShards + c.ParityShards +} + +// MaxTolerableLoss returns the maximum number of shards that can be lost +// while still being able to reconstruct the data +func (c ECConfig) MaxTolerableLoss() int { + return c.ParityShards +} + +// MinShardsForReconstruction returns the minimum number of shards needed +// to reconstruct the original data +func (c ECConfig) MinShardsForReconstruction() int { + return c.DataShards +} + +// String returns a human-readable representation +func (c ECConfig) String() string { + return fmt.Sprintf("%d+%d (total: %d, can lose: %d)", + c.DataShards, c.ParityShards, c.TotalShards(), c.MaxTolerableLoss()) +} + +// IsDataShard returns true if the shard ID is a data shard (0 to DataShards-1) +func (c ECConfig) IsDataShard(shardID int) bool { + return shardID >= 0 && shardID < c.DataShards +} + +// IsParityShard returns true if the shard ID is a parity shard (DataShards to TotalShards-1) +func (c ECConfig) IsParityShard(shardID int) bool { + return shardID >= c.DataShards && shardID < c.TotalShards() +} + +// SortShardsDataFirst returns a copy of shards sorted with data shards first. +// This is useful for initial placement where data shards should be spread out first. +func (c ECConfig) SortShardsDataFirst(shards []int) []int { + result := make([]int, len(shards)) + copy(result, shards) + + // Partition: data shards first, then parity shards + dataIdx := 0 + parityIdx := len(result) - 1 + + sorted := make([]int, len(result)) + for _, s := range result { + if c.IsDataShard(s) { + sorted[dataIdx] = s + dataIdx++ + } else { + sorted[parityIdx] = s + parityIdx-- + } + } + + return sorted +} + +// SortShardsParityFirst returns a copy of shards sorted with parity shards first. +// This is useful for rebalancing where we prefer to move parity shards. +func (c ECConfig) SortShardsParityFirst(shards []int) []int { + result := make([]int, len(shards)) + copy(result, shards) + + // Partition: parity shards first, then data shards + parityIdx := 0 + dataIdx := len(result) - 1 + + sorted := make([]int, len(result)) + for _, s := range result { + if c.IsParityShard(s) { + sorted[parityIdx] = s + parityIdx++ + } else { + sorted[dataIdx] = s + dataIdx-- + } + } + + return sorted +} + +// ReplicationConfig holds the parsed replication policy +type ReplicationConfig struct { + MinDataCenters int // X+1 from XYZ replication (minimum DCs to use) + MinRacksPerDC int // Y+1 from XYZ replication (minimum racks per DC) + MinNodesPerRack int // Z+1 from XYZ replication (minimum nodes per rack) + + // Original replication string (for logging/debugging) + Original string +} + +// NewReplicationConfig creates a ReplicationConfig from a ReplicaPlacement +func NewReplicationConfig(rp *super_block.ReplicaPlacement) ReplicationConfig { + if rp == nil { + return ReplicationConfig{ + MinDataCenters: 1, + MinRacksPerDC: 1, + MinNodesPerRack: 1, + Original: "000", + } + } + return ReplicationConfig{ + MinDataCenters: rp.DiffDataCenterCount + 1, + MinRacksPerDC: rp.DiffRackCount + 1, + MinNodesPerRack: rp.SameRackCount + 1, + Original: rp.String(), + } +} + +// NewReplicationConfigFromString creates a ReplicationConfig from a replication string +func NewReplicationConfigFromString(replication string) (ReplicationConfig, error) { + rp, err := super_block.NewReplicaPlacementFromString(replication) + if err != nil { + return ReplicationConfig{}, err + } + return NewReplicationConfig(rp), nil +} + +// TotalPlacementSlots returns the minimum number of unique placement locations +// based on the replication policy +func (r ReplicationConfig) TotalPlacementSlots() int { + return r.MinDataCenters * r.MinRacksPerDC * r.MinNodesPerRack +} + +// String returns a human-readable representation +func (r ReplicationConfig) String() string { + return fmt.Sprintf("replication=%s (DCs:%d, Racks/DC:%d, Nodes/Rack:%d)", + r.Original, r.MinDataCenters, r.MinRacksPerDC, r.MinNodesPerRack) +} diff --git a/weed/storage/erasure_coding/distribution/distribution.go b/weed/storage/erasure_coding/distribution/distribution.go new file mode 100644 index 000000000..a49f10a1e --- /dev/null +++ b/weed/storage/erasure_coding/distribution/distribution.go @@ -0,0 +1,161 @@ +package distribution + +import ( + "fmt" +) + +// ECDistribution represents the target distribution of EC shards +// based on EC configuration and replication policy. +type ECDistribution struct { + // EC configuration + ECConfig ECConfig + + // Replication configuration + ReplicationConfig ReplicationConfig + + // Target shard counts per topology level (balanced distribution) + TargetShardsPerDC int + TargetShardsPerRack int + TargetShardsPerNode int + + // Maximum shard counts per topology level (fault tolerance limits) + // These prevent any single failure domain from having too many shards + MaxShardsPerDC int + MaxShardsPerRack int + MaxShardsPerNode int +} + +// CalculateDistribution computes the target EC shard distribution based on +// EC configuration and replication policy. +// +// The algorithm: +// 1. Uses replication policy to determine minimum topology spread +// 2. Calculates target shards per level (evenly distributed) +// 3. Calculates max shards per level (for fault tolerance) +func CalculateDistribution(ec ECConfig, rep ReplicationConfig) *ECDistribution { + totalShards := ec.TotalShards() + + // Target distribution (balanced, rounded up to ensure all shards placed) + targetShardsPerDC := ceilDivide(totalShards, rep.MinDataCenters) + targetShardsPerRack := ceilDivide(targetShardsPerDC, rep.MinRacksPerDC) + targetShardsPerNode := ceilDivide(targetShardsPerRack, rep.MinNodesPerRack) + + // Maximum limits for fault tolerance + // The key constraint: losing one failure domain shouldn't lose more than parityShards + // So max shards per domain = totalShards - parityShards + tolerance + // We add small tolerance (+2) to allow for imbalanced topologies + faultToleranceLimit := totalShards - ec.ParityShards + 1 + + maxShardsPerDC := min(faultToleranceLimit, targetShardsPerDC+2) + maxShardsPerRack := min(faultToleranceLimit, targetShardsPerRack+2) + maxShardsPerNode := min(faultToleranceLimit, targetShardsPerNode+2) + + return &ECDistribution{ + ECConfig: ec, + ReplicationConfig: rep, + TargetShardsPerDC: targetShardsPerDC, + TargetShardsPerRack: targetShardsPerRack, + TargetShardsPerNode: targetShardsPerNode, + MaxShardsPerDC: maxShardsPerDC, + MaxShardsPerRack: maxShardsPerRack, + MaxShardsPerNode: maxShardsPerNode, + } +} + +// String returns a human-readable description of the distribution +func (d *ECDistribution) String() string { + return fmt.Sprintf( + "ECDistribution{EC:%s, DCs:%d (target:%d/max:%d), Racks/DC:%d (target:%d/max:%d), Nodes/Rack:%d (target:%d/max:%d)}", + d.ECConfig.String(), + d.ReplicationConfig.MinDataCenters, d.TargetShardsPerDC, d.MaxShardsPerDC, + d.ReplicationConfig.MinRacksPerDC, d.TargetShardsPerRack, d.MaxShardsPerRack, + d.ReplicationConfig.MinNodesPerRack, d.TargetShardsPerNode, d.MaxShardsPerNode, + ) +} + +// Summary returns a multi-line summary of the distribution plan +func (d *ECDistribution) Summary() string { + summary := fmt.Sprintf("EC Configuration: %s\n", d.ECConfig.String()) + summary += fmt.Sprintf("Replication: %s\n", d.ReplicationConfig.String()) + summary += fmt.Sprintf("Distribution Plan:\n") + summary += fmt.Sprintf(" Data Centers: %d (target %d shards each, max %d)\n", + d.ReplicationConfig.MinDataCenters, d.TargetShardsPerDC, d.MaxShardsPerDC) + summary += fmt.Sprintf(" Racks per DC: %d (target %d shards each, max %d)\n", + d.ReplicationConfig.MinRacksPerDC, d.TargetShardsPerRack, d.MaxShardsPerRack) + summary += fmt.Sprintf(" Nodes per Rack: %d (target %d shards each, max %d)\n", + d.ReplicationConfig.MinNodesPerRack, d.TargetShardsPerNode, d.MaxShardsPerNode) + return summary +} + +// CanSurviveDCFailure returns true if the distribution can survive +// complete loss of one data center +func (d *ECDistribution) CanSurviveDCFailure() bool { + // After losing one DC with max shards, check if remaining shards are enough + remainingAfterDCLoss := d.ECConfig.TotalShards() - d.TargetShardsPerDC + return remainingAfterDCLoss >= d.ECConfig.MinShardsForReconstruction() +} + +// CanSurviveRackFailure returns true if the distribution can survive +// complete loss of one rack +func (d *ECDistribution) CanSurviveRackFailure() bool { + remainingAfterRackLoss := d.ECConfig.TotalShards() - d.TargetShardsPerRack + return remainingAfterRackLoss >= d.ECConfig.MinShardsForReconstruction() +} + +// MinDCsForDCFaultTolerance calculates the minimum number of DCs needed +// to survive complete DC failure with this EC configuration +func (d *ECDistribution) MinDCsForDCFaultTolerance() int { + // To survive DC failure, max shards per DC = parityShards + maxShardsPerDC := d.ECConfig.MaxTolerableLoss() + if maxShardsPerDC == 0 { + return d.ECConfig.TotalShards() // Would need one DC per shard + } + return ceilDivide(d.ECConfig.TotalShards(), maxShardsPerDC) +} + +// FaultToleranceAnalysis returns a detailed analysis of fault tolerance +func (d *ECDistribution) FaultToleranceAnalysis() string { + analysis := fmt.Sprintf("Fault Tolerance Analysis for %s:\n", d.ECConfig.String()) + + // DC failure + dcSurvive := d.CanSurviveDCFailure() + shardsAfterDC := d.ECConfig.TotalShards() - d.TargetShardsPerDC + analysis += fmt.Sprintf(" DC Failure: %s\n", boolToResult(dcSurvive)) + analysis += fmt.Sprintf(" - Losing one DC loses ~%d shards\n", d.TargetShardsPerDC) + analysis += fmt.Sprintf(" - Remaining: %d shards (need %d)\n", shardsAfterDC, d.ECConfig.DataShards) + if !dcSurvive { + analysis += fmt.Sprintf(" - Need at least %d DCs for DC fault tolerance\n", d.MinDCsForDCFaultTolerance()) + } + + // Rack failure + rackSurvive := d.CanSurviveRackFailure() + shardsAfterRack := d.ECConfig.TotalShards() - d.TargetShardsPerRack + analysis += fmt.Sprintf(" Rack Failure: %s\n", boolToResult(rackSurvive)) + analysis += fmt.Sprintf(" - Losing one rack loses ~%d shards\n", d.TargetShardsPerRack) + analysis += fmt.Sprintf(" - Remaining: %d shards (need %d)\n", shardsAfterRack, d.ECConfig.DataShards) + + // Node failure (usually survivable) + shardsAfterNode := d.ECConfig.TotalShards() - d.TargetShardsPerNode + nodeSurvive := shardsAfterNode >= d.ECConfig.DataShards + analysis += fmt.Sprintf(" Node Failure: %s\n", boolToResult(nodeSurvive)) + analysis += fmt.Sprintf(" - Losing one node loses ~%d shards\n", d.TargetShardsPerNode) + analysis += fmt.Sprintf(" - Remaining: %d shards (need %d)\n", shardsAfterNode, d.ECConfig.DataShards) + + return analysis +} + +func boolToResult(b bool) string { + if b { + return "SURVIVABLE ✓" + } + return "NOT SURVIVABLE ✗" +} + +// ceilDivide performs ceiling division +func ceilDivide(a, b int) int { + if b <= 0 { + return a + } + return (a + b - 1) / b +} + diff --git a/weed/storage/erasure_coding/distribution/distribution_test.go b/weed/storage/erasure_coding/distribution/distribution_test.go new file mode 100644 index 000000000..dc6a19192 --- /dev/null +++ b/weed/storage/erasure_coding/distribution/distribution_test.go @@ -0,0 +1,565 @@ +package distribution + +import ( + "testing" +) + +func TestNewECConfig(t *testing.T) { + tests := []struct { + name string + dataShards int + parityShards int + wantErr bool + }{ + {"valid 10+4", 10, 4, false}, + {"valid 8+4", 8, 4, false}, + {"valid 6+3", 6, 3, false}, + {"valid 4+2", 4, 2, false}, + {"invalid data=0", 0, 4, true}, + {"invalid parity=0", 10, 0, true}, + {"invalid total>32", 20, 15, true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + config, err := NewECConfig(tt.dataShards, tt.parityShards) + if (err != nil) != tt.wantErr { + t.Errorf("NewECConfig() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !tt.wantErr { + if config.DataShards != tt.dataShards { + t.Errorf("DataShards = %d, want %d", config.DataShards, tt.dataShards) + } + if config.ParityShards != tt.parityShards { + t.Errorf("ParityShards = %d, want %d", config.ParityShards, tt.parityShards) + } + if config.TotalShards() != tt.dataShards+tt.parityShards { + t.Errorf("TotalShards() = %d, want %d", config.TotalShards(), tt.dataShards+tt.parityShards) + } + } + }) + } +} + +func TestCalculateDistribution(t *testing.T) { + tests := []struct { + name string + ecConfig ECConfig + replication string + expectedMinDCs int + expectedMinRacksPerDC int + expectedMinNodesPerRack int + expectedTargetPerDC int + expectedTargetPerRack int + expectedTargetPerNode int + }{ + { + name: "10+4 with 000", + ecConfig: DefaultECConfig(), + replication: "000", + expectedMinDCs: 1, + expectedMinRacksPerDC: 1, + expectedMinNodesPerRack: 1, + expectedTargetPerDC: 14, + expectedTargetPerRack: 14, + expectedTargetPerNode: 14, + }, + { + name: "10+4 with 100", + ecConfig: DefaultECConfig(), + replication: "100", + expectedMinDCs: 2, + expectedMinRacksPerDC: 1, + expectedMinNodesPerRack: 1, + expectedTargetPerDC: 7, + expectedTargetPerRack: 7, + expectedTargetPerNode: 7, + }, + { + name: "10+4 with 110", + ecConfig: DefaultECConfig(), + replication: "110", + expectedMinDCs: 2, + expectedMinRacksPerDC: 2, + expectedMinNodesPerRack: 1, + expectedTargetPerDC: 7, + expectedTargetPerRack: 4, + expectedTargetPerNode: 4, + }, + { + name: "10+4 with 200", + ecConfig: DefaultECConfig(), + replication: "200", + expectedMinDCs: 3, + expectedMinRacksPerDC: 1, + expectedMinNodesPerRack: 1, + expectedTargetPerDC: 5, + expectedTargetPerRack: 5, + expectedTargetPerNode: 5, + }, + { + name: "8+4 with 110", + ecConfig: ECConfig{ + DataShards: 8, + ParityShards: 4, + }, + replication: "110", + expectedMinDCs: 2, + expectedMinRacksPerDC: 2, + expectedMinNodesPerRack: 1, + expectedTargetPerDC: 6, // 12/2 = 6 + expectedTargetPerRack: 3, // 6/2 = 3 + expectedTargetPerNode: 3, + }, + { + name: "6+3 with 100", + ecConfig: ECConfig{ + DataShards: 6, + ParityShards: 3, + }, + replication: "100", + expectedMinDCs: 2, + expectedMinRacksPerDC: 1, + expectedMinNodesPerRack: 1, + expectedTargetPerDC: 5, // ceil(9/2) = 5 + expectedTargetPerRack: 5, + expectedTargetPerNode: 5, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rep, err := NewReplicationConfigFromString(tt.replication) + if err != nil { + t.Fatalf("Failed to parse replication %s: %v", tt.replication, err) + } + + dist := CalculateDistribution(tt.ecConfig, rep) + + if dist.ReplicationConfig.MinDataCenters != tt.expectedMinDCs { + t.Errorf("MinDataCenters = %d, want %d", dist.ReplicationConfig.MinDataCenters, tt.expectedMinDCs) + } + if dist.ReplicationConfig.MinRacksPerDC != tt.expectedMinRacksPerDC { + t.Errorf("MinRacksPerDC = %d, want %d", dist.ReplicationConfig.MinRacksPerDC, tt.expectedMinRacksPerDC) + } + if dist.ReplicationConfig.MinNodesPerRack != tt.expectedMinNodesPerRack { + t.Errorf("MinNodesPerRack = %d, want %d", dist.ReplicationConfig.MinNodesPerRack, tt.expectedMinNodesPerRack) + } + if dist.TargetShardsPerDC != tt.expectedTargetPerDC { + t.Errorf("TargetShardsPerDC = %d, want %d", dist.TargetShardsPerDC, tt.expectedTargetPerDC) + } + if dist.TargetShardsPerRack != tt.expectedTargetPerRack { + t.Errorf("TargetShardsPerRack = %d, want %d", dist.TargetShardsPerRack, tt.expectedTargetPerRack) + } + if dist.TargetShardsPerNode != tt.expectedTargetPerNode { + t.Errorf("TargetShardsPerNode = %d, want %d", dist.TargetShardsPerNode, tt.expectedTargetPerNode) + } + + t.Logf("Distribution for %s: %s", tt.name, dist.String()) + }) + } +} + +func TestFaultToleranceAnalysis(t *testing.T) { + tests := []struct { + name string + ecConfig ECConfig + replication string + canSurviveDC bool + canSurviveRack bool + }{ + // 10+4 = 14 shards, need 10 to reconstruct, can lose 4 + {"10+4 000", DefaultECConfig(), "000", false, false}, // All in one, any failure is fatal + {"10+4 100", DefaultECConfig(), "100", false, false}, // 7 per DC/rack, 7 remaining < 10 + {"10+4 200", DefaultECConfig(), "200", false, false}, // 5 per DC/rack, 9 remaining < 10 + {"10+4 110", DefaultECConfig(), "110", false, true}, // 4 per rack, 10 remaining = enough for rack + + // 8+4 = 12 shards, need 8 to reconstruct, can lose 4 + {"8+4 100", ECConfig{8, 4}, "100", false, false}, // 6 per DC/rack, 6 remaining < 8 + {"8+4 200", ECConfig{8, 4}, "200", true, true}, // 4 per DC/rack, 8 remaining = enough! + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rep, _ := NewReplicationConfigFromString(tt.replication) + dist := CalculateDistribution(tt.ecConfig, rep) + + if dist.CanSurviveDCFailure() != tt.canSurviveDC { + t.Errorf("CanSurviveDCFailure() = %v, want %v", dist.CanSurviveDCFailure(), tt.canSurviveDC) + } + if dist.CanSurviveRackFailure() != tt.canSurviveRack { + t.Errorf("CanSurviveRackFailure() = %v, want %v", dist.CanSurviveRackFailure(), tt.canSurviveRack) + } + + t.Log(dist.FaultToleranceAnalysis()) + }) + } +} + +func TestMinDCsForDCFaultTolerance(t *testing.T) { + tests := []struct { + name string + ecConfig ECConfig + minDCs int + }{ + // 10+4: can lose 4, so max 4 per DC, 14/4 = 4 DCs needed + {"10+4", DefaultECConfig(), 4}, + // 8+4: can lose 4, so max 4 per DC, 12/4 = 3 DCs needed + {"8+4", ECConfig{8, 4}, 3}, + // 6+3: can lose 3, so max 3 per DC, 9/3 = 3 DCs needed + {"6+3", ECConfig{6, 3}, 3}, + // 4+2: can lose 2, so max 2 per DC, 6/2 = 3 DCs needed + {"4+2", ECConfig{4, 2}, 3}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rep, _ := NewReplicationConfigFromString("000") + dist := CalculateDistribution(tt.ecConfig, rep) + + if dist.MinDCsForDCFaultTolerance() != tt.minDCs { + t.Errorf("MinDCsForDCFaultTolerance() = %d, want %d", + dist.MinDCsForDCFaultTolerance(), tt.minDCs) + } + + t.Logf("%s: needs %d DCs for DC fault tolerance", tt.name, dist.MinDCsForDCFaultTolerance()) + }) + } +} + +func TestTopologyAnalysis(t *testing.T) { + analysis := NewTopologyAnalysis() + + // Add nodes to topology + node1 := &TopologyNode{ + NodeID: "node1", + DataCenter: "dc1", + Rack: "rack1", + FreeSlots: 5, + } + node2 := &TopologyNode{ + NodeID: "node2", + DataCenter: "dc1", + Rack: "rack2", + FreeSlots: 10, + } + node3 := &TopologyNode{ + NodeID: "node3", + DataCenter: "dc2", + Rack: "rack3", + FreeSlots: 10, + } + + analysis.AddNode(node1) + analysis.AddNode(node2) + analysis.AddNode(node3) + + // Add shard locations (all on node1) + for i := 0; i < 14; i++ { + analysis.AddShardLocation(ShardLocation{ + ShardID: i, + NodeID: "node1", + DataCenter: "dc1", + Rack: "rack1", + }) + } + + analysis.Finalize() + + // Verify counts + if analysis.TotalShards != 14 { + t.Errorf("TotalShards = %d, want 14", analysis.TotalShards) + } + if analysis.ShardsByDC["dc1"] != 14 { + t.Errorf("ShardsByDC[dc1] = %d, want 14", analysis.ShardsByDC["dc1"]) + } + if analysis.ShardsByRack["rack1"] != 14 { + t.Errorf("ShardsByRack[rack1] = %d, want 14", analysis.ShardsByRack["rack1"]) + } + if analysis.ShardsByNode["node1"] != 14 { + t.Errorf("ShardsByNode[node1] = %d, want 14", analysis.ShardsByNode["node1"]) + } + + t.Log(analysis.DetailedString()) +} + +func TestRebalancer(t *testing.T) { + // Build topology: 2 DCs, 2 racks each, all shards on one node + analysis := NewTopologyAnalysis() + + // Add nodes + nodes := []*TopologyNode{ + {NodeID: "dc1-rack1-node1", DataCenter: "dc1", Rack: "dc1-rack1", FreeSlots: 0}, + {NodeID: "dc1-rack2-node1", DataCenter: "dc1", Rack: "dc1-rack2", FreeSlots: 10}, + {NodeID: "dc2-rack1-node1", DataCenter: "dc2", Rack: "dc2-rack1", FreeSlots: 10}, + {NodeID: "dc2-rack2-node1", DataCenter: "dc2", Rack: "dc2-rack2", FreeSlots: 10}, + } + for _, node := range nodes { + analysis.AddNode(node) + } + + // Add all 14 shards to first node + for i := 0; i < 14; i++ { + analysis.AddShardLocation(ShardLocation{ + ShardID: i, + NodeID: "dc1-rack1-node1", + DataCenter: "dc1", + Rack: "dc1-rack1", + }) + } + analysis.Finalize() + + // Create rebalancer with 110 replication (2 DCs, 2 racks each) + ec := DefaultECConfig() + rep, _ := NewReplicationConfigFromString("110") + rebalancer := NewRebalancer(ec, rep) + + plan, err := rebalancer.PlanRebalance(analysis) + if err != nil { + t.Fatalf("PlanRebalance failed: %v", err) + } + + t.Logf("Planned %d moves", plan.TotalMoves) + t.Log(plan.DetailedString()) + + // Verify we're moving shards to dc2 + movedToDC2 := 0 + for _, move := range plan.Moves { + if move.DestNode.DataCenter == "dc2" { + movedToDC2++ + } + } + + if movedToDC2 == 0 { + t.Error("Expected some moves to dc2") + } + + // With "110" replication, target is 7 shards per DC + // Starting with 14 in dc1, should plan to move 7 to dc2 + if plan.MovesAcrossDC < 7 { + t.Errorf("Expected at least 7 cross-DC moves for 110 replication, got %d", plan.MovesAcrossDC) + } +} + +func TestCustomECRatios(t *testing.T) { + // Test various custom EC ratios that seaweed-enterprise might use + ratios := []struct { + name string + data int + parity int + }{ + {"4+2", 4, 2}, + {"6+3", 6, 3}, + {"8+2", 8, 2}, + {"8+4", 8, 4}, + {"10+4", 10, 4}, + {"12+4", 12, 4}, + {"16+4", 16, 4}, + } + + for _, ratio := range ratios { + t.Run(ratio.name, func(t *testing.T) { + ec, err := NewECConfig(ratio.data, ratio.parity) + if err != nil { + t.Fatalf("Failed to create EC config: %v", err) + } + + rep, _ := NewReplicationConfigFromString("110") + dist := CalculateDistribution(ec, rep) + + t.Logf("EC %s with replication 110:", ratio.name) + t.Logf(" Total shards: %d", ec.TotalShards()) + t.Logf(" Can lose: %d shards", ec.MaxTolerableLoss()) + t.Logf(" Target per DC: %d", dist.TargetShardsPerDC) + t.Logf(" Target per rack: %d", dist.TargetShardsPerRack) + t.Logf(" Min DCs for DC fault tolerance: %d", dist.MinDCsForDCFaultTolerance()) + + // Verify basic sanity + if dist.TargetShardsPerDC*2 < ec.TotalShards() { + t.Errorf("Target per DC (%d) * 2 should be >= total (%d)", + dist.TargetShardsPerDC, ec.TotalShards()) + } + }) + } +} + +func TestShardClassification(t *testing.T) { + ec := DefaultECConfig() // 10+4 + + // Test IsDataShard + for i := 0; i < 10; i++ { + if !ec.IsDataShard(i) { + t.Errorf("Shard %d should be a data shard", i) + } + if ec.IsParityShard(i) { + t.Errorf("Shard %d should not be a parity shard", i) + } + } + + // Test IsParityShard + for i := 10; i < 14; i++ { + if ec.IsDataShard(i) { + t.Errorf("Shard %d should not be a data shard", i) + } + if !ec.IsParityShard(i) { + t.Errorf("Shard %d should be a parity shard", i) + } + } + + // Test with custom 8+4 EC + ec84, _ := NewECConfig(8, 4) + for i := 0; i < 8; i++ { + if !ec84.IsDataShard(i) { + t.Errorf("8+4 EC: Shard %d should be a data shard", i) + } + } + for i := 8; i < 12; i++ { + if !ec84.IsParityShard(i) { + t.Errorf("8+4 EC: Shard %d should be a parity shard", i) + } + } +} + +func TestSortShardsDataFirst(t *testing.T) { + ec := DefaultECConfig() // 10+4 + + // Mixed shards: [0, 10, 5, 11, 2, 12, 7, 13] + shards := []int{0, 10, 5, 11, 2, 12, 7, 13} + sorted := ec.SortShardsDataFirst(shards) + + t.Logf("Original: %v", shards) + t.Logf("Sorted (data first): %v", sorted) + + // First 4 should be data shards (0, 5, 2, 7) + for i := 0; i < 4; i++ { + if !ec.IsDataShard(sorted[i]) { + t.Errorf("Position %d should be a data shard, got %d", i, sorted[i]) + } + } + + // Last 4 should be parity shards (10, 11, 12, 13) + for i := 4; i < 8; i++ { + if !ec.IsParityShard(sorted[i]) { + t.Errorf("Position %d should be a parity shard, got %d", i, sorted[i]) + } + } +} + +func TestSortShardsParityFirst(t *testing.T) { + ec := DefaultECConfig() // 10+4 + + // Mixed shards: [0, 10, 5, 11, 2, 12, 7, 13] + shards := []int{0, 10, 5, 11, 2, 12, 7, 13} + sorted := ec.SortShardsParityFirst(shards) + + t.Logf("Original: %v", shards) + t.Logf("Sorted (parity first): %v", sorted) + + // First 4 should be parity shards (10, 11, 12, 13) + for i := 0; i < 4; i++ { + if !ec.IsParityShard(sorted[i]) { + t.Errorf("Position %d should be a parity shard, got %d", i, sorted[i]) + } + } + + // Last 4 should be data shards (0, 5, 2, 7) + for i := 4; i < 8; i++ { + if !ec.IsDataShard(sorted[i]) { + t.Errorf("Position %d should be a data shard, got %d", i, sorted[i]) + } + } +} + +func TestRebalancerPrefersMovingParityShards(t *testing.T) { + // Build topology where one node has all shards including mix of data and parity + analysis := NewTopologyAnalysis() + + // Node 1: Has all 14 shards (mixed data and parity) + node1 := &TopologyNode{ + NodeID: "node1", + DataCenter: "dc1", + Rack: "rack1", + FreeSlots: 0, + } + analysis.AddNode(node1) + + // Node 2: Empty, ready to receive + node2 := &TopologyNode{ + NodeID: "node2", + DataCenter: "dc1", + Rack: "rack1", + FreeSlots: 10, + } + analysis.AddNode(node2) + + // Add all 14 shards to node1 + for i := 0; i < 14; i++ { + analysis.AddShardLocation(ShardLocation{ + ShardID: i, + NodeID: "node1", + DataCenter: "dc1", + Rack: "rack1", + }) + } + analysis.Finalize() + + // Create rebalancer + ec := DefaultECConfig() + rep, _ := NewReplicationConfigFromString("000") + rebalancer := NewRebalancer(ec, rep) + + plan, err := rebalancer.PlanRebalance(analysis) + if err != nil { + t.Fatalf("PlanRebalance failed: %v", err) + } + + t.Logf("Planned %d moves", len(plan.Moves)) + + // Check that parity shards are moved first + parityMovesFirst := 0 + dataMovesFirst := 0 + seenDataMove := false + + for _, move := range plan.Moves { + isParity := ec.IsParityShard(move.ShardID) + t.Logf("Move shard %d (parity=%v): %s -> %s", + move.ShardID, isParity, move.SourceNode.NodeID, move.DestNode.NodeID) + + if isParity && !seenDataMove { + parityMovesFirst++ + } else if !isParity { + seenDataMove = true + dataMovesFirst++ + } + } + + t.Logf("Parity moves before first data move: %d", parityMovesFirst) + t.Logf("Data moves: %d", dataMovesFirst) + + // With 10+4 EC, there are 4 parity shards + // They should be moved before data shards when possible + if parityMovesFirst < 4 && len(plan.Moves) >= 4 { + t.Logf("Note: Expected parity shards to be moved first, but got %d parity moves before data moves", parityMovesFirst) + } +} + +func TestDistributionSummary(t *testing.T) { + ec := DefaultECConfig() + rep, _ := NewReplicationConfigFromString("110") + dist := CalculateDistribution(ec, rep) + + summary := dist.Summary() + t.Log(summary) + + if len(summary) == 0 { + t.Error("Summary should not be empty") + } + + analysis := dist.FaultToleranceAnalysis() + t.Log(analysis) + + if len(analysis) == 0 { + t.Error("Fault tolerance analysis should not be empty") + } +} diff --git a/weed/storage/erasure_coding/distribution/rebalancer.go b/weed/storage/erasure_coding/distribution/rebalancer.go new file mode 100644 index 000000000..17fc41b23 --- /dev/null +++ b/weed/storage/erasure_coding/distribution/rebalancer.go @@ -0,0 +1,378 @@ +package distribution + +import ( + "fmt" + "slices" +) + +// ShardMove represents a planned shard move +type ShardMove struct { + ShardID int + SourceNode *TopologyNode + DestNode *TopologyNode + Reason string +} + +// String returns a human-readable description of the move +func (m ShardMove) String() string { + return fmt.Sprintf("shard %d: %s -> %s (%s)", + m.ShardID, m.SourceNode.NodeID, m.DestNode.NodeID, m.Reason) +} + +// RebalancePlan contains the complete plan for rebalancing EC shards +type RebalancePlan struct { + Moves []ShardMove + Distribution *ECDistribution + Analysis *TopologyAnalysis + + // Statistics + TotalMoves int + MovesAcrossDC int + MovesAcrossRack int + MovesWithinRack int +} + +// String returns a summary of the plan +func (p *RebalancePlan) String() string { + return fmt.Sprintf("RebalancePlan{moves:%d, acrossDC:%d, acrossRack:%d, withinRack:%d}", + p.TotalMoves, p.MovesAcrossDC, p.MovesAcrossRack, p.MovesWithinRack) +} + +// DetailedString returns a detailed multi-line summary +func (p *RebalancePlan) DetailedString() string { + s := fmt.Sprintf("Rebalance Plan:\n") + s += fmt.Sprintf(" Total Moves: %d\n", p.TotalMoves) + s += fmt.Sprintf(" Across DC: %d\n", p.MovesAcrossDC) + s += fmt.Sprintf(" Across Rack: %d\n", p.MovesAcrossRack) + s += fmt.Sprintf(" Within Rack: %d\n", p.MovesWithinRack) + s += fmt.Sprintf("\nMoves:\n") + for i, move := range p.Moves { + s += fmt.Sprintf(" %d. %s\n", i+1, move.String()) + } + return s +} + +// Rebalancer plans shard moves to achieve proportional distribution +type Rebalancer struct { + ecConfig ECConfig + repConfig ReplicationConfig +} + +// NewRebalancer creates a new rebalancer with the given configuration +func NewRebalancer(ec ECConfig, rep ReplicationConfig) *Rebalancer { + return &Rebalancer{ + ecConfig: ec, + repConfig: rep, + } +} + +// PlanRebalance creates a rebalancing plan based on current topology analysis +func (r *Rebalancer) PlanRebalance(analysis *TopologyAnalysis) (*RebalancePlan, error) { + dist := CalculateDistribution(r.ecConfig, r.repConfig) + + plan := &RebalancePlan{ + Distribution: dist, + Analysis: analysis, + } + + // Step 1: Balance across data centers + dcMoves := r.planDCMoves(analysis, dist) + for _, move := range dcMoves { + plan.Moves = append(plan.Moves, move) + plan.MovesAcrossDC++ + } + + // Update analysis after DC moves (for planning purposes) + r.applyMovesToAnalysis(analysis, dcMoves) + + // Step 2: Balance across racks within each DC + rackMoves := r.planRackMoves(analysis, dist) + for _, move := range rackMoves { + plan.Moves = append(plan.Moves, move) + plan.MovesAcrossRack++ + } + + // Update analysis after rack moves + r.applyMovesToAnalysis(analysis, rackMoves) + + // Step 3: Balance across nodes within each rack + nodeMoves := r.planNodeMoves(analysis, dist) + for _, move := range nodeMoves { + plan.Moves = append(plan.Moves, move) + plan.MovesWithinRack++ + } + + plan.TotalMoves = len(plan.Moves) + + return plan, nil +} + +// planDCMoves plans moves to balance shards across data centers +func (r *Rebalancer) planDCMoves(analysis *TopologyAnalysis, dist *ECDistribution) []ShardMove { + var moves []ShardMove + + overDCs := CalculateDCExcess(analysis, dist) + underDCs := CalculateUnderservedDCs(analysis, dist) + + underIdx := 0 + for _, over := range overDCs { + for over.Excess > 0 && underIdx < len(underDCs) { + destDC := underDCs[underIdx] + + // Find a shard and source node + shardID, srcNode := r.pickShardToMove(analysis, over.Nodes) + if srcNode == nil { + break + } + + // Find destination node in target DC + destNode := r.pickBestDestination(analysis, destDC, "", dist) + if destNode == nil { + underIdx++ + continue + } + + moves = append(moves, ShardMove{ + ShardID: shardID, + SourceNode: srcNode, + DestNode: destNode, + Reason: fmt.Sprintf("balance DC: %s -> %s", srcNode.DataCenter, destDC), + }) + + over.Excess-- + analysis.ShardsByDC[srcNode.DataCenter]-- + analysis.ShardsByDC[destDC]++ + + // Check if destDC reached target + if analysis.ShardsByDC[destDC] >= dist.TargetShardsPerDC { + underIdx++ + } + } + } + + return moves +} + +// planRackMoves plans moves to balance shards across racks within each DC +func (r *Rebalancer) planRackMoves(analysis *TopologyAnalysis, dist *ECDistribution) []ShardMove { + var moves []ShardMove + + for dc := range analysis.DCToRacks { + dcShards := analysis.ShardsByDC[dc] + numRacks := len(analysis.DCToRacks[dc]) + if numRacks == 0 { + continue + } + + targetPerRack := ceilDivide(dcShards, max(numRacks, dist.ReplicationConfig.MinRacksPerDC)) + + overRacks := CalculateRackExcess(analysis, dc, targetPerRack) + underRacks := CalculateUnderservedRacks(analysis, dc, targetPerRack) + + underIdx := 0 + for _, over := range overRacks { + for over.Excess > 0 && underIdx < len(underRacks) { + destRack := underRacks[underIdx] + + // Find shard and source node + shardID, srcNode := r.pickShardToMove(analysis, over.Nodes) + if srcNode == nil { + break + } + + // Find destination node in target rack + destNode := r.pickBestDestination(analysis, dc, destRack, dist) + if destNode == nil { + underIdx++ + continue + } + + moves = append(moves, ShardMove{ + ShardID: shardID, + SourceNode: srcNode, + DestNode: destNode, + Reason: fmt.Sprintf("balance rack: %s -> %s", srcNode.Rack, destRack), + }) + + over.Excess-- + analysis.ShardsByRack[srcNode.Rack]-- + analysis.ShardsByRack[destRack]++ + + if analysis.ShardsByRack[destRack] >= targetPerRack { + underIdx++ + } + } + } + } + + return moves +} + +// planNodeMoves plans moves to balance shards across nodes within each rack +func (r *Rebalancer) planNodeMoves(analysis *TopologyAnalysis, dist *ECDistribution) []ShardMove { + var moves []ShardMove + + for rack, nodes := range analysis.RackToNodes { + if len(nodes) <= 1 { + continue + } + + rackShards := analysis.ShardsByRack[rack] + targetPerNode := ceilDivide(rackShards, max(len(nodes), dist.ReplicationConfig.MinNodesPerRack)) + + // Find over and under nodes + var overNodes []*TopologyNode + var underNodes []*TopologyNode + + for _, node := range nodes { + count := analysis.ShardsByNode[node.NodeID] + if count > targetPerNode { + overNodes = append(overNodes, node) + } else if count < targetPerNode { + underNodes = append(underNodes, node) + } + } + + // Sort by excess/deficit + slices.SortFunc(overNodes, func(a, b *TopologyNode) int { + return analysis.ShardsByNode[b.NodeID] - analysis.ShardsByNode[a.NodeID] + }) + + underIdx := 0 + for _, srcNode := range overNodes { + excess := analysis.ShardsByNode[srcNode.NodeID] - targetPerNode + + for excess > 0 && underIdx < len(underNodes) { + destNode := underNodes[underIdx] + + // Pick a shard from this node, preferring parity shards + shards := analysis.NodeToShards[srcNode.NodeID] + if len(shards) == 0 { + break + } + + // Find a parity shard first, fallback to data shard + shardID := -1 + shardIdx := -1 + for i, s := range shards { + if r.ecConfig.IsParityShard(s) { + shardID = s + shardIdx = i + break + } + } + if shardID == -1 { + shardID = shards[0] + shardIdx = 0 + } + + moves = append(moves, ShardMove{ + ShardID: shardID, + SourceNode: srcNode, + DestNode: destNode, + Reason: fmt.Sprintf("balance node: %s -> %s", srcNode.NodeID, destNode.NodeID), + }) + + excess-- + analysis.ShardsByNode[srcNode.NodeID]-- + analysis.ShardsByNode[destNode.NodeID]++ + + // Update shard lists - remove the specific shard we picked + analysis.NodeToShards[srcNode.NodeID] = append( + shards[:shardIdx], shards[shardIdx+1:]...) + analysis.NodeToShards[destNode.NodeID] = append( + analysis.NodeToShards[destNode.NodeID], shardID) + + if analysis.ShardsByNode[destNode.NodeID] >= targetPerNode { + underIdx++ + } + } + } + } + + return moves +} + +// pickShardToMove selects a shard and its node from the given nodes. +// It prefers to move parity shards first, keeping data shards spread out +// since data shards serve read requests while parity shards are only for reconstruction. +func (r *Rebalancer) pickShardToMove(analysis *TopologyAnalysis, nodes []*TopologyNode) (int, *TopologyNode) { + // Sort by shard count (most shards first) + slices.SortFunc(nodes, func(a, b *TopologyNode) int { + return analysis.ShardsByNode[b.NodeID] - analysis.ShardsByNode[a.NodeID] + }) + + // First pass: try to find a parity shard to move (prefer moving parity) + for _, node := range nodes { + shards := analysis.NodeToShards[node.NodeID] + for _, shardID := range shards { + if r.ecConfig.IsParityShard(shardID) { + return shardID, node + } + } + } + + // Second pass: if no parity shards, move a data shard + for _, node := range nodes { + shards := analysis.NodeToShards[node.NodeID] + if len(shards) > 0 { + return shards[0], node + } + } + + return -1, nil +} + +// pickBestDestination selects the best destination node +func (r *Rebalancer) pickBestDestination(analysis *TopologyAnalysis, targetDC, targetRack string, dist *ECDistribution) *TopologyNode { + var candidates []*TopologyNode + + // Collect candidates + for _, node := range analysis.AllNodes { + // Filter by DC if specified + if targetDC != "" && node.DataCenter != targetDC { + continue + } + // Filter by rack if specified + if targetRack != "" && node.Rack != targetRack { + continue + } + // Check capacity + if node.FreeSlots <= 0 { + continue + } + // Check max shards limit + if analysis.ShardsByNode[node.NodeID] >= dist.MaxShardsPerNode { + continue + } + + candidates = append(candidates, node) + } + + if len(candidates) == 0 { + return nil + } + + // Sort by: 1) fewer shards, 2) more free slots + slices.SortFunc(candidates, func(a, b *TopologyNode) int { + aShards := analysis.ShardsByNode[a.NodeID] + bShards := analysis.ShardsByNode[b.NodeID] + if aShards != bShards { + return aShards - bShards + } + return b.FreeSlots - a.FreeSlots + }) + + return candidates[0] +} + +// applyMovesToAnalysis is a no-op placeholder for potential future use. +// Note: All planners (planDCMoves, planRackMoves, planNodeMoves) update +// their respective counts (ShardsByDC, ShardsByRack, ShardsByNode) and +// shard lists (NodeToShards) inline during planning. This avoids duplicate +// updates that would occur if we also updated counts here. +func (r *Rebalancer) applyMovesToAnalysis(analysis *TopologyAnalysis, moves []ShardMove) { + // Counts are already updated by the individual planners. + // This function is kept for API compatibility and potential future use. +} +