Browse Source

fix: EC rebalance fails with replica placement 000 (#7812)

* fix: EC rebalance fails with replica placement 000

This PR fixes several issues with EC shard distribution:

1. Pre-flight check before EC encoding
   - Verify target disk type has capacity before encoding starts
   - Prevents encoding shards only to fail during rebalance
   - Shows helpful error when wrong diskType is specified (e.g., ssd when volumes are on hdd)

2. Fix EC rebalance with replica placement 000
   - When DiffRackCount=0, shards should be distributed freely across racks
   - The '000' placement means 'no volume replication needed' because EC provides redundancy
   - Previously all racks were skipped with error 'shards X > replica placement limit (0)'

3. Add unit tests for EC rebalance slot calculation
   - TestECRebalanceWithLimitedSlots: documents the limited slots scenario
   - TestECRebalanceZeroFreeSlots: reproduces the 0 free slots error

4. Add Makefile for manual EC testing
   - make setup: start cluster and populate data
   - make shell: open weed shell for EC commands
   - make clean: stop cluster and cleanup

* fix: default -rebalance to true for ec.encode

The -rebalance flag was defaulting to false, which meant ec.encode would
only print shard moves but not actually execute them. This is a poor
default since the whole point of EC encoding is to distribute shards
across servers for fault tolerance.

Now -rebalance defaults to true, so shards are actually distributed
after encoding. Users can use -rebalance=false if they only want to
see what would happen without making changes.

* test/erasure_coding: improve Makefile safety and docs

- Narrow pkill pattern for volume servers to use TEST_DIR instead of
  port pattern, avoiding accidental kills of unrelated SeaweedFS processes
- Document external dependencies (curl, jq) in header comments

* shell: refactor buildRackWithEcShards to reuse buildEcShards

Extract common shard bit construction logic to avoid duplication
between buildEcShards and buildRackWithEcShards helper functions.

* shell: update test for EC replication 000 behavior

When DiffRackCount=0 (replication "000"), EC shards should be
distributed freely across racks since erasure coding provides its
own redundancy. Update test expectation to reflect this behavior.

* erasure_coding: add distribution package for proportional EC shard placement

Add a new reusable package for EC shard distribution that:
- Supports configurable EC ratios (not hard-coded 10+4)
- Distributes shards proportionally based on replication policy
- Provides fault tolerance analysis
- Prefers moving parity shards to keep data shards spread out

Key components:
- ECConfig: Configurable data/parity shard counts
- ReplicationConfig: Parsed XYZ replication policy
- ECDistribution: Target shard counts per DC/rack/node
- Rebalancer: Plans shard moves with parity-first strategy

This enables seaweed-enterprise custom EC ratios and weed worker
integration while maintaining a clean, testable architecture.

* shell: integrate distribution package for EC rebalancing

Add shell wrappers around the distribution package:
- ProportionalECRebalancer: Plans moves using distribution.Rebalancer
- NewProportionalECRebalancerWithConfig: Supports custom EC configs
- GetDistributionSummary/GetFaultToleranceAnalysis: Helper functions

The shell layer converts between EcNode types and the generic
TopologyNode types used by the distribution package.

* test setup

* ec: improve data and parity shard distribution across racks

- Add shardsByTypePerRack helper to track data vs parity shards
- Rewrite doBalanceEcShardsAcrossRacks for two-pass balancing:
  1. Balance data shards (0-9) evenly, max ceil(10/6)=2 per rack
  2. Balance parity shards (10-13) evenly, max ceil(4/6)=1 per rack
- Add balanceShardTypeAcrossRacks for generic shard type balancing
- Add pickRackForShardType to select destination with room for type
- Add unit tests for even data/parity distribution verification

This ensures even read load during normal operation by spreading
both data and parity shards across all available racks.

* ec: make data/parity shard counts configurable in ecBalancer

- Add dataShardCount and parityShardCount fields to ecBalancer struct
- Add getDataShardCount() and getParityShardCount() methods with defaults
- Replace direct constant usage with configurable methods
- Fix unused variable warning for parityPerRack

This allows seaweed-enterprise to use custom EC ratios while
defaulting to standard 10+4 scheme.

* Address PR 7812 review comments

Makefile improvements:
- Save PIDs for each volume server for precise termination
- Use PID-based killing in stop target with pkill fallback
- Use more specific pkill patterns with TEST_DIR paths

Documentation:
- Document jq dependency in README.md

Rebalancer fix:
- Fix duplicate shard count updates in applyMovesToAnalysis
- All planners (DC/rack/node) update counts inline during planning
- Remove duplicate updates from applyMovesToAnalysis to avoid double-counting

* test/erasure_coding: use mktemp for test file template

Use mktemp instead of hardcoded /tmp/testfile_template.bin path
to provide better isolation for concurrent test runs.
pull/6224/merge
Chris Lu 1 day ago
committed by GitHub
parent
commit
4aa50bfa6a
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 1
      .gitignore
  2. 187
      test/erasure_coding/Makefile
  3. 37
      test/erasure_coding/README.md
  4. 194
      weed/shell/command_ec_common.go
  5. 4
      weed/shell/command_ec_common_test.go
  6. 28
      weed/shell/command_ec_encode.go
  7. 149
      weed/shell/command_ec_test.go
  8. 284
      weed/shell/ec_proportional_rebalance.go
  9. 251
      weed/shell/ec_proportional_rebalance_test.go
  10. 293
      weed/shell/ec_rebalance_slots_test.go
  11. 209
      weed/storage/erasure_coding/distribution/README.md
  12. 241
      weed/storage/erasure_coding/distribution/analysis.go
  13. 171
      weed/storage/erasure_coding/distribution/config.go
  14. 161
      weed/storage/erasure_coding/distribution/distribution.go
  15. 565
      weed/storage/erasure_coding/distribution/distribution_test.go
  16. 378
      weed/storage/erasure_coding/distribution/rebalancer.go

1
.gitignore

@ -130,3 +130,4 @@ coverage.out
/test/s3/remote_cache/test-remote-data /test/s3/remote_cache/test-remote-data
test/s3/remote_cache/remote-server.pid test/s3/remote_cache/remote-server.pid
test/s3/remote_cache/primary-server.pid test/s3/remote_cache/primary-server.pid
/test/erasure_coding/filerldb2

187
test/erasure_coding/Makefile

@ -0,0 +1,187 @@
# Makefile for EC integration testing
# Usage:
# make start - Start the test cluster (master + 6 volume servers + filer)
# make stop - Stop the test cluster
# make populate - Populate test data (~300MB across 7 volumes)
# make shell - Open weed shell connected to the test cluster
# make clean - Stop cluster and remove all test data
# make setup - Start cluster and populate data (one command)
#
# Requirements: curl, jq
WEED_BINARY := $(shell pwd)/../../weed/weed
TEST_DIR := /tmp/ec_manual_test
# Use non-standard ports to avoid conflicts with existing SeaweedFS servers
MASTER_PORT := 29333
FILER_PORT := 28888
VOLUME_BASE_PORT := 28080
NUM_VOLUME_SERVERS := 6
VOLUME_SIZE_LIMIT_MB := 30
MAX_VOLUMES_PER_SERVER := 10
# Build weed binary if it doesn't exist
$(WEED_BINARY):
cd ../../weed && go build -o weed .
.PHONY: build
build: $(WEED_BINARY)
.PHONY: start
start: build
@echo "=== Starting SeaweedFS test cluster ==="
@mkdir -p $(TEST_DIR)/master $(TEST_DIR)/filer
@for i in $$(seq 0 $$(($(NUM_VOLUME_SERVERS)-1))); do mkdir -p $(TEST_DIR)/volume$$i; done
@# Create security.toml with JWT disabled
@echo "# Disable JWT for testing" > $(TEST_DIR)/security.toml
@echo '[jwt.signing]' >> $(TEST_DIR)/security.toml
@echo 'key = ""' >> $(TEST_DIR)/security.toml
@echo 'expires_after_seconds = 0' >> $(TEST_DIR)/security.toml
@echo '' >> $(TEST_DIR)/security.toml
@echo '[jwt.signing.read]' >> $(TEST_DIR)/security.toml
@echo 'key = ""' >> $(TEST_DIR)/security.toml
@echo 'expires_after_seconds = 0' >> $(TEST_DIR)/security.toml
@# Create filer.toml with leveldb2
@echo '[leveldb2]' > $(TEST_DIR)/filer.toml
@echo 'enabled = true' >> $(TEST_DIR)/filer.toml
@echo 'dir = "$(TEST_DIR)/filer/filerldb2"' >> $(TEST_DIR)/filer.toml
@# Start master
@echo "Starting master on port $(MASTER_PORT)..."
@cd $(TEST_DIR) && $(WEED_BINARY) master \
-port=$(MASTER_PORT) \
-mdir=$(TEST_DIR)/master \
-volumeSizeLimitMB=$(VOLUME_SIZE_LIMIT_MB) \
-ip=127.0.0.1 \
> $(TEST_DIR)/master/master.log 2>&1 & echo $$! > $(TEST_DIR)/master.pid
@sleep 3
@# Start volume servers (run from TEST_DIR to find security.toml)
@for i in $$(seq 0 $$(($(NUM_VOLUME_SERVERS)-1))); do \
port=$$(($(VOLUME_BASE_PORT) + $$i)); \
echo "Starting volume server $$i on port $$port (rack$$i)..."; \
cd $(TEST_DIR) && $(WEED_BINARY) volume \
-port=$$port \
-dir=$(TEST_DIR)/volume$$i \
-max=$(MAX_VOLUMES_PER_SERVER) \
-master=127.0.0.1:$(MASTER_PORT) \
-ip=127.0.0.1 \
-dataCenter=dc1 \
-rack=rack$$i \
> $(TEST_DIR)/volume$$i/volume.log 2>&1 & echo $$! > $(TEST_DIR)/volume$$i.pid; \
done
@sleep 3
@# Start filer (run from TEST_DIR to find security.toml)
@echo "Starting filer on port $(FILER_PORT)..."
@cd $(TEST_DIR) && $(WEED_BINARY) filer \
-port=$(FILER_PORT) \
-master=127.0.0.1:$(MASTER_PORT) \
-ip=127.0.0.1 \
> $(TEST_DIR)/filer/filer.log 2>&1 & echo $$! > $(TEST_DIR)/filer.pid
@sleep 3
@echo ""
@echo "=== Cluster started ==="
@echo "Master: http://127.0.0.1:$(MASTER_PORT)"
@echo "Filer: http://127.0.0.1:$(FILER_PORT)"
@echo "Volume servers: http://127.0.0.1:$(VOLUME_BASE_PORT) - http://127.0.0.1:$$(($(VOLUME_BASE_PORT) + $(NUM_VOLUME_SERVERS) - 1))"
@echo ""
@echo "Run 'make shell' to open weed shell"
@echo "Run 'make populate' to add test data"
.PHONY: stop
stop:
@echo "=== Stopping SeaweedFS test cluster ==="
@# Stop filer by PID
@-[ -f $(TEST_DIR)/filer.pid ] && kill $$(cat $(TEST_DIR)/filer.pid) 2>/dev/null && rm -f $(TEST_DIR)/filer.pid || true
@# Stop volume servers by PID
@for i in $$(seq 0 $$(($(NUM_VOLUME_SERVERS)-1))); do \
[ -f $(TEST_DIR)/volume$$i.pid ] && kill $$(cat $(TEST_DIR)/volume$$i.pid) 2>/dev/null && rm -f $(TEST_DIR)/volume$$i.pid || true; \
done
@# Stop master by PID
@-[ -f $(TEST_DIR)/master.pid ] && kill $$(cat $(TEST_DIR)/master.pid) 2>/dev/null && rm -f $(TEST_DIR)/master.pid || true
@# Fallback: use pkill with specific patterns to ensure cleanup
@-pkill -f "weed filer.*-master=127.0.0.1:$(MASTER_PORT)" 2>/dev/null || true
@-pkill -f "weed volume.*-dir=$(TEST_DIR)/volume" 2>/dev/null || true
@-pkill -f "weed master.*-mdir=$(TEST_DIR)/master" 2>/dev/null || true
@echo "Cluster stopped."
.PHONY: clean
clean: stop
@echo "Removing test data..."
@rm -rf $(TEST_DIR)
@echo "Clean complete."
.PHONY: populate
populate:
@echo "=== Populating test data (~300MB) ==="
@# Create a 500KB test file template using mktemp for isolation
@tmpfile=$$(mktemp) && \
dd if=/dev/urandom bs=1024 count=500 of=$$tmpfile 2>/dev/null && \
uploaded=0; \
for i in $$(seq 1 600); do \
response=$$(curl -s "http://127.0.0.1:$(MASTER_PORT)/dir/assign?collection=ectest&replication=000"); \
fid=$$(echo $$response | jq -r '.fid'); \
url=$$(echo $$response | jq -r '.url'); \
if [ "$$fid" != "null" ] && [ -n "$$fid" ]; then \
curl -s -F "file=@$$tmpfile;filename=file_$$i.bin" "http://$$url/$$fid" > /dev/null; \
uploaded=$$((uploaded + 1)); \
fi; \
if [ $$((i % 100)) -eq 0 ]; then \
echo "Uploaded $$uploaded files..."; \
fi; \
done; \
rm -f $$tmpfile; \
echo ""; \
echo "=== Data population complete ==="; \
echo "Uploaded $$uploaded files (~$$((uploaded * 500 / 1024))MB)"
@echo ""
@echo "Volume status:"
@curl -s "http://127.0.0.1:$(MASTER_PORT)/vol/status" | jq -r \
'.Volumes.DataCenters.dc1 | to_entries[] | .key as $$rack | .value | to_entries[] | select(.value != null) | .key as $$server | .value[] | select(.Collection == "ectest") | " Volume \(.Id): \(.FileCount) files, \((.Size/1048576*10|floor)/10)MB - \($$rack)"' 2>/dev/null || true
.PHONY: shell
shell: build
@echo "Opening weed shell..."
@echo "Commands to try:"
@echo " lock"
@echo " volume.list"
@echo " ec.encode -collection=ectest -quietFor=1s -force"
@echo " ec.balance -collection=ectest"
@echo " unlock"
@echo ""
@$(WEED_BINARY) shell -master=127.0.0.1:$(MASTER_PORT) -filer=127.0.0.1:$(FILER_PORT)
.PHONY: setup
setup: clean start
@sleep 2
@$(MAKE) populate
.PHONY: status
status:
@echo "=== Cluster Status ==="
@curl -s "http://127.0.0.1:$(MASTER_PORT)/vol/status" | jq -r \
'.Volumes.DataCenters.dc1 | to_entries[] | .key as $$rack | .value | to_entries[] | select(.value != null) | .key as $$server | .value[] | select(.Collection == "ectest") | "Volume \(.Id): \(.FileCount) files, \((.Size/1048576*10|floor)/10)MB - \($$rack) (\($$server))"' 2>/dev/null | sort -t: -k1 -n || echo "Cluster not running"
@echo ""
@echo "=== EC Shards ==="
@for i in $$(seq 0 $$(($(NUM_VOLUME_SERVERS)-1))); do \
count=$$(ls $(TEST_DIR)/volume$$i/*.ec[0-9]* 2>/dev/null | wc -l | tr -d ' '); \
if [ "$$count" != "0" ]; then \
echo " volume$$i (port $$(($(VOLUME_BASE_PORT) + $$i))): $$count EC shard files"; \
fi; \
done
.PHONY: help
help:
@echo "EC Integration Test Makefile"
@echo ""
@echo "Targets:"
@echo " make start - Start test cluster (master + 6 volume servers + filer)"
@echo " make stop - Stop test cluster"
@echo " make populate - Populate ~300MB of test data"
@echo " make shell - Open weed shell"
@echo " make setup - Clean, start, and populate (all-in-one)"
@echo " make status - Show cluster and EC shard status"
@echo " make clean - Stop cluster and remove all test data"
@echo " make help - Show this help"
@echo ""
@echo "Quick start:"
@echo " make setup # Start cluster and populate data"
@echo " make shell # Open shell to run EC commands"

37
test/erasure_coding/README.md

@ -78,6 +78,43 @@ go test -v -run TestECEncodingMasterTimingRaceCondition
go test -v -short go test -v -short
``` ```
## Manual Testing with Makefile
A Makefile is provided for manual EC testing.
**Requirements:** `curl`, `jq` (command-line JSON processor)
```bash
# Quick start: start cluster and populate data
make setup
# Open weed shell to run EC commands
make shell
# Individual targets
make start # Start test cluster (master + 6 volume servers + filer)
make stop # Stop test cluster
make populate # Populate ~300MB of test data
make status # Show cluster and EC shard status
make clean # Stop cluster and remove all test data
make help # Show all targets
```
### EC Rebalance Limited Slots (Unit Test)
The "no free ec shard slots" issue is tested with a **unit test** that works directly on
topology data structures without requiring a running cluster.
**Location**: `weed/shell/ec_rebalance_slots_test.go`
Tests included:
- `TestECRebalanceWithLimitedSlots`: Tests a topology with 6 servers, 7 EC volumes (98 shards)
- `TestECRebalanceZeroFreeSlots`: Reproduces the exact 0 free slots scenario
**Known Issue**: When volume servers are at capacity (`volumeCount == maxVolumeCount`),
the rebalance step fails with "no free ec shard slots" instead of recognizing that
moving shards frees slots on source servers.
## Test Results ## Test Results
**With the fix**: Shows "Collecting volume locations for N volumes before EC encoding..." message **With the fix**: Shows "Collecting volume locations for N volumes before EC encoding..." message

194
weed/shell/command_ec_common.go

@ -679,6 +679,25 @@ type ecBalancer struct {
applyBalancing bool applyBalancing bool
maxParallelization int maxParallelization int
diskType types.DiskType // target disk type for EC shards (default: HardDriveType) diskType types.DiskType // target disk type for EC shards (default: HardDriveType)
// EC configuration for shard distribution (defaults to 10+4)
dataShardCount int
parityShardCount int
}
// getDataShardCount returns the configured data shard count, defaulting to standard 10
func (ecb *ecBalancer) getDataShardCount() int {
if ecb.dataShardCount > 0 {
return ecb.dataShardCount
}
return erasure_coding.DataShardsCount
}
// getParityShardCount returns the configured parity shard count, defaulting to standard 4
func (ecb *ecBalancer) getParityShardCount() int {
if ecb.parityShardCount > 0 {
return ecb.parityShardCount
}
return erasure_coding.ParityShardsCount
} }
func (ecb *ecBalancer) errorWaitGroup() *ErrorWaitGroup { func (ecb *ecBalancer) errorWaitGroup() *ErrorWaitGroup {
@ -785,59 +804,176 @@ func countShardsByRack(vid needle.VolumeId, locations []*EcNode, diskType types.
}) })
} }
// shardsByTypePerRack counts data shards (< dataShards) and parity shards (>= dataShards) per rack
func shardsByTypePerRack(vid needle.VolumeId, locations []*EcNode, diskType types.DiskType, dataShards int) (dataPerRack, parityPerRack map[string][]erasure_coding.ShardId) {
dataPerRack = make(map[string][]erasure_coding.ShardId)
parityPerRack = make(map[string][]erasure_coding.ShardId)
for _, ecNode := range locations {
shardBits := findEcVolumeShards(ecNode, vid, diskType)
rackId := string(ecNode.rack)
for _, shardId := range shardBits.ShardIds() {
if int(shardId) < dataShards {
dataPerRack[rackId] = append(dataPerRack[rackId], shardId)
} else {
parityPerRack[rackId] = append(parityPerRack[rackId], shardId)
}
}
}
return
}
func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needle.VolumeId, locations []*EcNode) error { func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needle.VolumeId, locations []*EcNode) error {
racks := ecb.racks() racks := ecb.racks()
numRacks := len(racks)
// Use configured EC scheme for shard type classification (defaults to 10+4)
dataShardCount := ecb.getDataShardCount()
parityShardCount := ecb.getParityShardCount()
// Get current distribution of data shards per rack (parity computed after data balancing)
dataPerRack, _ := shardsByTypePerRack(vid, locations, ecb.diskType, dataShardCount)
// Calculate max shards per rack for each type to ensure even spread
// Data: 10 shards / 6 racks = max 2 per rack
// Parity: 4 shards / 6 racks = max 1 per rack (with 2 racks having 0)
maxDataPerRack := ceilDivide(dataShardCount, numRacks)
maxParityPerRack := ceilDivide(parityShardCount, numRacks)
// see the volume's shards are in how many racks, and how many in each rack
rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
return string(ecNode.rack)
})
// Track total shard count per rack for slot management
rackToShardCount := countShardsByRack(vid, locations, ecb.diskType) rackToShardCount := countShardsByRack(vid, locations, ecb.diskType)
// Calculate actual total shards for this volume (not hardcoded default)
var totalShardsForVolume int
for _, count := range rackToShardCount {
totalShardsForVolume += count
// First pass: Balance data shards across racks
if err := ecb.balanceShardTypeAcrossRacks(collection, vid, racks, rackEcNodesWithVid, dataPerRack, rackToShardCount, maxDataPerRack, "data"); err != nil {
return err
} }
// calculate average number of shards an ec rack should have for one volume
averageShardsPerEcRack := ceilDivide(totalShardsForVolume, len(racks))
rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
// Refresh locations after data shard moves and get parity distribution
locations = ecb.collectVolumeIdToEcNodes(collection)[vid]
_, parityPerRack := shardsByTypePerRack(vid, locations, ecb.diskType, dataShardCount)
rackEcNodesWithVid = groupBy(locations, func(ecNode *EcNode) string {
return string(ecNode.rack) return string(ecNode.rack)
}) })
rackToShardCount = countShardsByRack(vid, locations, ecb.diskType)
// Second pass: Balance parity shards across racks
if err := ecb.balanceShardTypeAcrossRacks(collection, vid, racks, rackEcNodesWithVid, parityPerRack, rackToShardCount, maxParityPerRack, "parity"); err != nil {
return err
}
return nil
}
// ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
ecShardsToMove := make(map[erasure_coding.ShardId]*EcNode)
for rackId, count := range rackToShardCount {
if count <= averageShardsPerEcRack {
// balanceShardTypeAcrossRacks spreads shards of a specific type (data or parity) evenly across racks
func (ecb *ecBalancer) balanceShardTypeAcrossRacks(
collection string,
vid needle.VolumeId,
racks map[RackId]*EcRack,
rackEcNodesWithVid map[string][]*EcNode,
shardsPerRack map[string][]erasure_coding.ShardId,
rackToShardCount map[string]int,
maxPerRack int,
shardType string,
) error {
// Find racks with too many shards of this type
shardsToMove := make(map[erasure_coding.ShardId]*EcNode)
for rackId, shards := range shardsPerRack {
if len(shards) <= maxPerRack {
continue continue
} }
possibleEcNodes := rackEcNodesWithVid[rackId]
for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack, ecb.diskType) {
ecShardsToMove[shardId] = ecNode
// Pick excess shards to move
excess := len(shards) - maxPerRack
ecNodes := rackEcNodesWithVid[rackId]
for i := 0; i < excess && i < len(shards); i++ {
shardId := shards[i]
// Find which node has this shard
for _, ecNode := range ecNodes {
shardBits := findEcVolumeShards(ecNode, vid, ecb.diskType)
if shardBits.HasShardId(shardId) {
shardsToMove[shardId] = ecNode
break
}
}
} }
} }
for shardId, ecNode := range ecShardsToMove {
rackId, err := ecb.pickRackToBalanceShardsInto(racks, rackToShardCount)
// Move shards to racks that have fewer than maxPerRack of this type
for shardId, ecNode := range shardsToMove {
// Find destination rack with room for this shard type
destRackId, err := ecb.pickRackForShardType(racks, shardsPerRack, maxPerRack, rackToShardCount)
if err != nil { if err != nil {
fmt.Printf("ec shard %d.%d at %s can not find a destination rack:\n%s\n", vid, shardId, ecNode.info.Id, err.Error())
fmt.Printf("ec %s shard %d.%d at %s can not find a destination rack:\n%s\n", shardType, vid, shardId, ecNode.info.Id, err.Error())
continue continue
} }
var possibleDestinationEcNodes []*EcNode var possibleDestinationEcNodes []*EcNode
for _, n := range racks[rackId].ecNodes {
for _, n := range racks[destRackId].ecNodes {
possibleDestinationEcNodes = append(possibleDestinationEcNodes, n) possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
} }
err = ecb.pickOneEcNodeAndMoveOneShard(ecNode, collection, vid, shardId, possibleDestinationEcNodes) err = ecb.pickOneEcNodeAndMoveOneShard(ecNode, collection, vid, shardId, possibleDestinationEcNodes)
if err != nil { if err != nil {
return err return err
} }
rackToShardCount[string(rackId)] += 1
rackToShardCount[string(ecNode.rack)] -= 1
racks[rackId].freeEcSlot -= 1
racks[ecNode.rack].freeEcSlot += 1
// Update tracking
shardsPerRack[string(destRackId)] = append(shardsPerRack[string(destRackId)], shardId)
// Remove from source rack
srcRack := string(ecNode.rack)
for i, s := range shardsPerRack[srcRack] {
if s == shardId {
shardsPerRack[srcRack] = append(shardsPerRack[srcRack][:i], shardsPerRack[srcRack][i+1:]...)
break
}
}
rackToShardCount[string(destRackId)] += 1
rackToShardCount[srcRack] -= 1
racks[destRackId].freeEcSlot -= 1
racks[RackId(srcRack)].freeEcSlot += 1
} }
return nil return nil
} }
// pickRackForShardType selects a rack that has room for more shards of a specific type
func (ecb *ecBalancer) pickRackForShardType(
rackToEcNodes map[RackId]*EcRack,
shardsPerRack map[string][]erasure_coding.ShardId,
maxPerRack int,
rackToShardCount map[string]int,
) (RackId, error) {
var candidates []RackId
minShards := maxPerRack + 1
for rackId, rack := range rackToEcNodes {
if rack.freeEcSlot <= 0 {
continue
}
currentCount := len(shardsPerRack[string(rackId)])
if currentCount >= maxPerRack {
continue
}
// For EC shards, replica placement constraint only applies when DiffRackCount > 0.
if ecb.replicaPlacement != nil && ecb.replicaPlacement.DiffRackCount > 0 && rackToShardCount[string(rackId)] >= ecb.replicaPlacement.DiffRackCount {
continue
}
if currentCount < minShards {
candidates = nil
minShards = currentCount
}
if currentCount == minShards {
candidates = append(candidates, rackId)
}
}
if len(candidates) == 0 {
return "", errors.New("no rack available for shard type balancing")
}
return candidates[rand.IntN(len(candidates))], nil
}
func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int) (RackId, error) { func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int) (RackId, error) {
targets := []RackId{} targets := []RackId{}
targetShards := -1 targetShards := -1
@ -855,7 +991,11 @@ func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcR
details += fmt.Sprintf(" Skipped %s because it has no free slots\n", rackId) details += fmt.Sprintf(" Skipped %s because it has no free slots\n", rackId)
continue continue
} }
if ecb.replicaPlacement != nil && shards > ecb.replicaPlacement.DiffRackCount {
// For EC shards, replica placement constraint only applies when DiffRackCount > 0.
// When DiffRackCount = 0 (e.g., replica placement "000"), EC shards should be
// distributed freely across racks for fault tolerance - the "000" means
// "no volume replication needed" because erasure coding provides redundancy.
if ecb.replicaPlacement != nil && ecb.replicaPlacement.DiffRackCount > 0 && shards > ecb.replicaPlacement.DiffRackCount {
details += fmt.Sprintf(" Skipped %s because shards %d > replica placement limit for other racks (%d)\n", rackId, shards, ecb.replicaPlacement.DiffRackCount) details += fmt.Sprintf(" Skipped %s because shards %d > replica placement limit for other racks (%d)\n", rackId, shards, ecb.replicaPlacement.DiffRackCount)
continue continue
} }
@ -1056,7 +1196,11 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi
} }
shards := nodeShards[node] shards := nodeShards[node]
if ecb.replicaPlacement != nil && shards > ecb.replicaPlacement.SameRackCount+1 {
// For EC shards, replica placement constraint only applies when SameRackCount > 0.
// When SameRackCount = 0 (e.g., replica placement "000"), EC shards should be
// distributed freely within racks - the "000" means "no volume replication needed"
// because erasure coding provides redundancy.
if ecb.replicaPlacement != nil && ecb.replicaPlacement.SameRackCount > 0 && shards > ecb.replicaPlacement.SameRackCount+1 {
details += fmt.Sprintf(" Skipped %s because shards %d > replica placement limit for the rack (%d + 1)\n", node.info.Id, shards, ecb.replicaPlacement.SameRackCount) details += fmt.Sprintf(" Skipped %s because shards %d > replica placement limit for the rack (%d + 1)\n", node.info.Id, shards, ecb.replicaPlacement.SameRackCount)
continue continue
} }

4
weed/shell/command_ec_common_test.go

@ -133,7 +133,9 @@ func TestPickRackToBalanceShardsInto(t *testing.T) {
{testTopologyEc, "6241", "123", []string{"rack1", "rack2", "rack3", "rack4", "rack5", "rack6"}, ""}, {testTopologyEc, "6241", "123", []string{"rack1", "rack2", "rack3", "rack4", "rack5", "rack6"}, ""},
{testTopologyEc, "6242", "123", []string{"rack1", "rack2", "rack3", "rack4", "rack5", "rack6"}, ""}, {testTopologyEc, "6242", "123", []string{"rack1", "rack2", "rack3", "rack4", "rack5", "rack6"}, ""},
// EC volumes. // EC volumes.
{testTopologyEc, "9577", "", nil, "shards 1 > replica placement limit for other racks (0)"},
// With replication "000" (DiffRackCount=0), EC shards should be distributed freely
// because erasure coding provides its own redundancy. No replica placement error.
{testTopologyEc, "9577", "", []string{"rack1", "rack2", "rack3"}, ""},
{testTopologyEc, "9577", "111", []string{"rack1", "rack2", "rack3"}, ""}, {testTopologyEc, "9577", "111", []string{"rack1", "rack2", "rack3"}, ""},
{testTopologyEc, "9577", "222", []string{"rack1", "rack2", "rack3"}, ""}, {testTopologyEc, "9577", "222", []string{"rack1", "rack2", "rack3"}, ""},
{testTopologyEc, "10457", "222", []string{"rack1"}, ""}, {testTopologyEc, "10457", "222", []string{"rack1"}, ""},

28
weed/shell/command_ec_encode.go

@ -94,7 +94,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
shardReplicaPlacement := encodeCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty") shardReplicaPlacement := encodeCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty")
sourceDiskTypeStr := encodeCommand.String("sourceDiskType", "", "filter source volumes by disk type (hdd, ssd, or empty for all)") sourceDiskTypeStr := encodeCommand.String("sourceDiskType", "", "filter source volumes by disk type (hdd, ssd, or empty for all)")
diskTypeStr := encodeCommand.String("diskType", "", "target disk type for EC shards (hdd, ssd, or empty for default hdd)") diskTypeStr := encodeCommand.String("diskType", "", "target disk type for EC shards (hdd, ssd, or empty for default hdd)")
applyBalancing := encodeCommand.Bool("rebalance", false, "re-balance EC shards after creation")
applyBalancing := encodeCommand.Bool("rebalance", true, "re-balance EC shards after creation (default: true)")
verbose := encodeCommand.Bool("verbose", false, "show detailed reasons why volumes are not selected for encoding") verbose := encodeCommand.Bool("verbose", false, "show detailed reasons why volumes are not selected for encoding")
if err = encodeCommand.Parse(args); err != nil { if err = encodeCommand.Parse(args); err != nil {
@ -164,6 +164,32 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return fmt.Errorf("failed to collect volume locations before EC encoding: %w", err) return fmt.Errorf("failed to collect volume locations before EC encoding: %w", err)
} }
// Pre-flight check: verify the target disk type has capacity for EC shards
// This prevents encoding shards only to fail during rebalance
_, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, "", diskType)
if err != nil {
return fmt.Errorf("failed to check EC shard capacity: %w", err)
}
// Calculate required slots: each volume needs TotalShardsCount (14) shards distributed
requiredSlots := len(volumeIds) * erasure_coding.TotalShardsCount
if totalFreeEcSlots < 1 {
// No capacity at all on the target disk type
if diskType != types.HardDriveType {
return fmt.Errorf("no free ec shard slots on disk type '%s'. The target disk type has no capacity.\n"+
"Your volumes are likely on a different disk type. Try:\n"+
" ec.encode -collection=%s -diskType=hdd\n"+
"Or omit -diskType to use the default (hdd)", diskType, *collection)
}
return fmt.Errorf("no free ec shard slots. only %d left on disk type '%s'", totalFreeEcSlots, diskType)
}
if totalFreeEcSlots < requiredSlots {
fmt.Printf("Warning: limited EC shard capacity. Need %d slots for %d volumes, but only %d slots available on disk type '%s'.\n",
requiredSlots, len(volumeIds), totalFreeEcSlots, diskType)
fmt.Printf("Rebalancing may not achieve optimal distribution.\n")
}
// encode all requested volumes... // encode all requested volumes...
if err = doEcEncode(commandEnv, writer, volumeIdToCollection, volumeIds, *maxParallelization); err != nil { if err = doEcEncode(commandEnv, writer, volumeIdToCollection, volumeIds, *maxParallelization); err != nil {
return fmt.Errorf("ec encode for volumes %v: %w", volumeIds, err) return fmt.Errorf("ec encode for volumes %v: %w", volumeIds, err)

149
weed/shell/command_ec_test.go

@ -4,6 +4,7 @@ import (
"testing" "testing"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/storage/types"
) )
@ -136,3 +137,151 @@ func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNod
func (ecNode *EcNode) addEcVolumeAndShardsForTest(vid uint32, collection string, shardIds []uint32) *EcNode { func (ecNode *EcNode) addEcVolumeAndShardsForTest(vid uint32, collection string, shardIds []uint32) *EcNode {
return ecNode.addEcVolumeShards(needle.VolumeId(vid), collection, shardIds, types.HardDriveType) return ecNode.addEcVolumeShards(needle.VolumeId(vid), collection, shardIds, types.HardDriveType)
} }
// TestCommandEcBalanceEvenDataAndParityDistribution verifies that after balancing:
// 1. Data shards (0-9) are evenly distributed across racks (max 2 per rack for 6 racks)
// 2. Parity shards (10-13) are evenly distributed across racks (max 1 per rack for 6 racks)
func TestCommandEcBalanceEvenDataAndParityDistribution(t *testing.T) {
// Setup: All 14 shards start on rack1 (simulating fresh EC encode)
ecb := &ecBalancer{
ecNodes: []*EcNode{
// All shards initially on rack1/dn1
newEcNode("dc1", "rack1", "dn1", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}),
// Empty nodes on other racks
newEcNode("dc1", "rack2", "dn2", 100),
newEcNode("dc1", "rack3", "dn3", 100),
newEcNode("dc1", "rack4", "dn4", 100),
newEcNode("dc1", "rack5", "dn5", 100),
newEcNode("dc1", "rack6", "dn6", 100),
},
applyBalancing: false, // Dry-run mode (simulates moves by updating internal state)
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
// After balancing (dry-run), verify the PLANNED distribution by checking what moves were proposed
// The ecb.ecNodes state is updated during dry-run to track planned moves
vid := needle.VolumeId(1)
dataShardCount := erasure_coding.DataShardsCount // 10
parityShardCount := erasure_coding.ParityShardsCount // 4
// Count data and parity shards per rack based on current (updated) state
dataPerRack, parityPerRack := countDataAndParityShardsPerRack(ecb.ecNodes, vid, dataShardCount)
// With 6 racks:
// - Data shards (10): max 2 per rack (ceil(10/6) = 2)
// - Parity shards (4): max 1 per rack (ceil(4/6) = 1)
maxDataPerRack := ceilDivide(dataShardCount, 6) // 2
maxParityPerRack := ceilDivide(parityShardCount, 6) // 1
// Verify no rack has more than max data shards
for rackId, count := range dataPerRack {
if count > maxDataPerRack {
t.Errorf("rack %s has %d data shards, expected max %d", rackId, count, maxDataPerRack)
}
}
// Verify no rack has more than max parity shards
for rackId, count := range parityPerRack {
if count > maxParityPerRack {
t.Errorf("rack %s has %d parity shards, expected max %d", rackId, count, maxParityPerRack)
}
}
// Verify all shards are distributed (total counts)
totalData := 0
totalParity := 0
for _, count := range dataPerRack {
totalData += count
}
for _, count := range parityPerRack {
totalParity += count
}
if totalData != dataShardCount {
t.Errorf("total data shards = %d, expected %d", totalData, dataShardCount)
}
if totalParity != parityShardCount {
t.Errorf("total parity shards = %d, expected %d", totalParity, parityShardCount)
}
// Verify data shards are spread across at least 5 racks (10 shards / 2 max per rack)
racksWithData := len(dataPerRack)
minRacksForData := dataShardCount / maxDataPerRack // At least 5 racks needed for 10 data shards
if racksWithData < minRacksForData {
t.Errorf("data shards spread across only %d racks, expected at least %d", racksWithData, minRacksForData)
}
// Verify parity shards are spread across at least 4 racks (4 shards / 1 max per rack)
racksWithParity := len(parityPerRack)
if racksWithParity < parityShardCount {
t.Errorf("parity shards spread across only %d racks, expected at least %d", racksWithParity, parityShardCount)
}
t.Logf("Distribution after balancing:")
t.Logf(" Data shards per rack: %v (max allowed: %d)", dataPerRack, maxDataPerRack)
t.Logf(" Parity shards per rack: %v (max allowed: %d)", parityPerRack, maxParityPerRack)
}
// countDataAndParityShardsPerRack counts data and parity shards per rack
func countDataAndParityShardsPerRack(ecNodes []*EcNode, vid needle.VolumeId, dataShardCount int) (dataPerRack, parityPerRack map[string]int) {
dataPerRack = make(map[string]int)
parityPerRack = make(map[string]int)
for _, ecNode := range ecNodes {
shardBits := findEcVolumeShards(ecNode, vid, types.HardDriveType)
for _, shardId := range shardBits.ShardIds() {
rackId := string(ecNode.rack)
if int(shardId) < dataShardCount {
dataPerRack[rackId]++
} else {
parityPerRack[rackId]++
}
}
}
return
}
// TestCommandEcBalanceMultipleVolumesEvenDistribution tests that multiple volumes
// each get their data and parity shards evenly distributed
func TestCommandEcBalanceMultipleVolumesEvenDistribution(t *testing.T) {
// Setup: Two volumes, each with all 14 shards on different starting racks
ecb := &ecBalancer{
ecNodes: []*EcNode{
// Volume 1: all shards on rack1
newEcNode("dc1", "rack1", "dn1", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}),
// Volume 2: all shards on rack2
newEcNode("dc1", "rack2", "dn2", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}),
// Empty nodes on other racks
newEcNode("dc1", "rack3", "dn3", 100),
newEcNode("dc1", "rack4", "dn4", 100),
newEcNode("dc1", "rack5", "dn5", 100),
newEcNode("dc1", "rack6", "dn6", 100),
},
applyBalancing: false, // Dry-run mode
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
// Check both volumes
for _, vid := range []needle.VolumeId{1, 2} {
dataPerRack, parityPerRack := countDataAndParityShardsPerRack(ecb.ecNodes, vid, erasure_coding.DataShardsCount)
maxDataPerRack := ceilDivide(erasure_coding.DataShardsCount, 6)
maxParityPerRack := ceilDivide(erasure_coding.ParityShardsCount, 6)
for rackId, count := range dataPerRack {
if count > maxDataPerRack {
t.Errorf("volume %d: rack %s has %d data shards, expected max %d", vid, rackId, count, maxDataPerRack)
}
}
for rackId, count := range parityPerRack {
if count > maxParityPerRack {
t.Errorf("volume %d: rack %s has %d parity shards, expected max %d", vid, rackId, count, maxParityPerRack)
}
}
t.Logf("Volume %d - Data: %v, Parity: %v", vid, dataPerRack, parityPerRack)
}
}

284
weed/shell/ec_proportional_rebalance.go

@ -0,0 +1,284 @@
package shell
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/distribution"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
// ECDistribution is an alias to the distribution package type for backward compatibility
type ECDistribution = distribution.ECDistribution
// CalculateECDistribution computes the target EC shard distribution based on replication policy.
// This is a convenience wrapper that uses the default 10+4 EC configuration.
// For custom EC ratios, use the distribution package directly.
func CalculateECDistribution(totalShards, parityShards int, rp *super_block.ReplicaPlacement) *ECDistribution {
ec := distribution.ECConfig{
DataShards: totalShards - parityShards,
ParityShards: parityShards,
}
rep := distribution.NewReplicationConfig(rp)
return distribution.CalculateDistribution(ec, rep)
}
// TopologyDistributionAnalysis holds the current shard distribution analysis
// This wraps the distribution package's TopologyAnalysis with shell-specific EcNode handling
type TopologyDistributionAnalysis struct {
inner *distribution.TopologyAnalysis
// Shell-specific mappings
nodeMap map[string]*EcNode // nodeID -> EcNode
}
// NewTopologyDistributionAnalysis creates a new analysis structure
func NewTopologyDistributionAnalysis() *TopologyDistributionAnalysis {
return &TopologyDistributionAnalysis{
inner: distribution.NewTopologyAnalysis(),
nodeMap: make(map[string]*EcNode),
}
}
// AddNode adds a node and its shards to the analysis
func (a *TopologyDistributionAnalysis) AddNode(node *EcNode, shardBits erasure_coding.ShardBits) {
nodeId := node.info.Id
// Create distribution.TopologyNode from EcNode
topoNode := &distribution.TopologyNode{
NodeID: nodeId,
DataCenter: string(node.dc),
Rack: string(node.rack),
FreeSlots: node.freeEcSlot,
TotalShards: shardBits.ShardIdCount(),
}
for _, shardId := range shardBits.ShardIds() {
topoNode.ShardIDs = append(topoNode.ShardIDs, int(shardId))
}
a.inner.AddNode(topoNode)
a.nodeMap[nodeId] = node
// Add shard locations
for _, shardId := range shardBits.ShardIds() {
a.inner.AddShardLocation(distribution.ShardLocation{
ShardID: int(shardId),
NodeID: nodeId,
DataCenter: string(node.dc),
Rack: string(node.rack),
})
}
}
// Finalize completes the analysis
func (a *TopologyDistributionAnalysis) Finalize() {
a.inner.Finalize()
}
// String returns a summary
func (a *TopologyDistributionAnalysis) String() string {
return a.inner.String()
}
// DetailedString returns detailed analysis
func (a *TopologyDistributionAnalysis) DetailedString() string {
return a.inner.DetailedString()
}
// GetShardsByDC returns shard counts by DC
func (a *TopologyDistributionAnalysis) GetShardsByDC() map[DataCenterId]int {
result := make(map[DataCenterId]int)
for dc, count := range a.inner.ShardsByDC {
result[DataCenterId(dc)] = count
}
return result
}
// GetShardsByRack returns shard counts by rack
func (a *TopologyDistributionAnalysis) GetShardsByRack() map[RackId]int {
result := make(map[RackId]int)
for rack, count := range a.inner.ShardsByRack {
result[RackId(rack)] = count
}
return result
}
// GetShardsByNode returns shard counts by node
func (a *TopologyDistributionAnalysis) GetShardsByNode() map[EcNodeId]int {
result := make(map[EcNodeId]int)
for nodeId, count := range a.inner.ShardsByNode {
result[EcNodeId(nodeId)] = count
}
return result
}
// AnalyzeVolumeDistribution creates an analysis of current shard distribution for a volume
func AnalyzeVolumeDistribution(volumeId needle.VolumeId, locations []*EcNode, diskType types.DiskType) *TopologyDistributionAnalysis {
analysis := NewTopologyDistributionAnalysis()
for _, node := range locations {
shardBits := findEcVolumeShards(node, volumeId, diskType)
if shardBits.ShardIdCount() > 0 {
analysis.AddNode(node, shardBits)
}
}
analysis.Finalize()
return analysis
}
// ECShardMove represents a planned shard move (shell-specific with EcNode references)
type ECShardMove struct {
VolumeId needle.VolumeId
ShardId erasure_coding.ShardId
SourceNode *EcNode
DestNode *EcNode
Reason string
}
// String returns a human-readable description
func (m ECShardMove) String() string {
return fmt.Sprintf("volume %d shard %d: %s -> %s (%s)",
m.VolumeId, m.ShardId, m.SourceNode.info.Id, m.DestNode.info.Id, m.Reason)
}
// ProportionalECRebalancer implements proportional shard distribution for shell commands
type ProportionalECRebalancer struct {
ecNodes []*EcNode
replicaPlacement *super_block.ReplicaPlacement
diskType types.DiskType
ecConfig distribution.ECConfig
}
// NewProportionalECRebalancer creates a new proportional rebalancer with default EC config
func NewProportionalECRebalancer(
ecNodes []*EcNode,
rp *super_block.ReplicaPlacement,
diskType types.DiskType,
) *ProportionalECRebalancer {
return NewProportionalECRebalancerWithConfig(
ecNodes,
rp,
diskType,
distribution.DefaultECConfig(),
)
}
// NewProportionalECRebalancerWithConfig creates a rebalancer with custom EC configuration
func NewProportionalECRebalancerWithConfig(
ecNodes []*EcNode,
rp *super_block.ReplicaPlacement,
diskType types.DiskType,
ecConfig distribution.ECConfig,
) *ProportionalECRebalancer {
return &ProportionalECRebalancer{
ecNodes: ecNodes,
replicaPlacement: rp,
diskType: diskType,
ecConfig: ecConfig,
}
}
// PlanMoves generates a plan for moving shards to achieve proportional distribution
func (r *ProportionalECRebalancer) PlanMoves(
volumeId needle.VolumeId,
locations []*EcNode,
) ([]ECShardMove, error) {
// Build topology analysis
analysis := distribution.NewTopologyAnalysis()
nodeMap := make(map[string]*EcNode)
// Add all EC nodes to the analysis (even those without shards)
for _, node := range r.ecNodes {
nodeId := node.info.Id
topoNode := &distribution.TopologyNode{
NodeID: nodeId,
DataCenter: string(node.dc),
Rack: string(node.rack),
FreeSlots: node.freeEcSlot,
}
analysis.AddNode(topoNode)
nodeMap[nodeId] = node
}
// Add shard locations from nodes that have shards
for _, node := range locations {
nodeId := node.info.Id
shardBits := findEcVolumeShards(node, volumeId, r.diskType)
for _, shardId := range shardBits.ShardIds() {
analysis.AddShardLocation(distribution.ShardLocation{
ShardID: int(shardId),
NodeID: nodeId,
DataCenter: string(node.dc),
Rack: string(node.rack),
})
}
if _, exists := nodeMap[nodeId]; !exists {
nodeMap[nodeId] = node
}
}
analysis.Finalize()
// Create rebalancer and plan moves
rep := distribution.NewReplicationConfig(r.replicaPlacement)
rebalancer := distribution.NewRebalancer(r.ecConfig, rep)
plan, err := rebalancer.PlanRebalance(analysis)
if err != nil {
return nil, err
}
// Convert distribution moves to shell moves
var moves []ECShardMove
for _, move := range plan.Moves {
srcNode := nodeMap[move.SourceNode.NodeID]
destNode := nodeMap[move.DestNode.NodeID]
if srcNode == nil || destNode == nil {
continue
}
moves = append(moves, ECShardMove{
VolumeId: volumeId,
ShardId: erasure_coding.ShardId(move.ShardID),
SourceNode: srcNode,
DestNode: destNode,
Reason: move.Reason,
})
}
return moves, nil
}
// GetDistributionSummary returns a summary of the planned distribution
func GetDistributionSummary(rp *super_block.ReplicaPlacement) string {
ec := distribution.DefaultECConfig()
rep := distribution.NewReplicationConfig(rp)
dist := distribution.CalculateDistribution(ec, rep)
return dist.Summary()
}
// GetDistributionSummaryWithConfig returns a summary with custom EC configuration
func GetDistributionSummaryWithConfig(rp *super_block.ReplicaPlacement, ecConfig distribution.ECConfig) string {
rep := distribution.NewReplicationConfig(rp)
dist := distribution.CalculateDistribution(ecConfig, rep)
return dist.Summary()
}
// GetFaultToleranceAnalysis returns fault tolerance analysis for the given configuration
func GetFaultToleranceAnalysis(rp *super_block.ReplicaPlacement) string {
ec := distribution.DefaultECConfig()
rep := distribution.NewReplicationConfig(rp)
dist := distribution.CalculateDistribution(ec, rep)
return dist.FaultToleranceAnalysis()
}
// GetFaultToleranceAnalysisWithConfig returns fault tolerance analysis with custom EC configuration
func GetFaultToleranceAnalysisWithConfig(rp *super_block.ReplicaPlacement, ecConfig distribution.ECConfig) string {
rep := distribution.NewReplicationConfig(rp)
dist := distribution.CalculateDistribution(ecConfig, rep)
return dist.FaultToleranceAnalysis()
}

251
weed/shell/ec_proportional_rebalance_test.go

@ -0,0 +1,251 @@
package shell
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/distribution"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
func TestCalculateECDistributionShell(t *testing.T) {
// Test the shell wrapper function
rp, _ := super_block.NewReplicaPlacementFromString("110")
dist := CalculateECDistribution(
erasure_coding.TotalShardsCount,
erasure_coding.ParityShardsCount,
rp,
)
if dist.ReplicationConfig.MinDataCenters != 2 {
t.Errorf("Expected 2 DCs, got %d", dist.ReplicationConfig.MinDataCenters)
}
if dist.TargetShardsPerDC != 7 {
t.Errorf("Expected 7 shards per DC, got %d", dist.TargetShardsPerDC)
}
t.Log(dist.Summary())
}
func TestAnalyzeVolumeDistributionShell(t *testing.T) {
diskType := types.HardDriveType
diskTypeKey := string(diskType)
// Build a topology with unbalanced distribution
node1 := &EcNode{
info: &master_pb.DataNodeInfo{
Id: "127.0.0.1:8080",
DiskInfos: map[string]*master_pb.DiskInfo{
diskTypeKey: {
Type: diskTypeKey,
MaxVolumeCount: 10,
EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{
{
Id: 1,
Collection: "test",
EcIndexBits: 0x3FFF, // All 14 shards
},
},
},
},
},
dc: "dc1",
rack: "rack1",
freeEcSlot: 5,
}
node2 := &EcNode{
info: &master_pb.DataNodeInfo{
Id: "127.0.0.1:8081",
DiskInfos: map[string]*master_pb.DiskInfo{
diskTypeKey: {
Type: diskTypeKey,
MaxVolumeCount: 10,
EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{},
},
},
},
dc: "dc2",
rack: "rack2",
freeEcSlot: 10,
}
locations := []*EcNode{node1, node2}
volumeId := needle.VolumeId(1)
analysis := AnalyzeVolumeDistribution(volumeId, locations, diskType)
shardsByDC := analysis.GetShardsByDC()
if shardsByDC["dc1"] != 14 {
t.Errorf("Expected 14 shards in dc1, got %d", shardsByDC["dc1"])
}
t.Log(analysis.DetailedString())
}
func TestProportionalRebalancerShell(t *testing.T) {
diskType := types.HardDriveType
diskTypeKey := string(diskType)
// Build topology: 2 DCs, 2 racks each, all shards on one node
nodes := []*EcNode{
{
info: &master_pb.DataNodeInfo{
Id: "dc1-rack1-node1",
DiskInfos: map[string]*master_pb.DiskInfo{
diskTypeKey: {
Type: diskTypeKey,
MaxVolumeCount: 10,
EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{
{Id: 1, Collection: "test", EcIndexBits: 0x3FFF},
},
},
},
},
dc: "dc1", rack: "dc1-rack1", freeEcSlot: 0,
},
{
info: &master_pb.DataNodeInfo{
Id: "dc1-rack2-node1",
DiskInfos: map[string]*master_pb.DiskInfo{
diskTypeKey: {Type: diskTypeKey, MaxVolumeCount: 10},
},
},
dc: "dc1", rack: "dc1-rack2", freeEcSlot: 10,
},
{
info: &master_pb.DataNodeInfo{
Id: "dc2-rack1-node1",
DiskInfos: map[string]*master_pb.DiskInfo{
diskTypeKey: {Type: diskTypeKey, MaxVolumeCount: 10},
},
},
dc: "dc2", rack: "dc2-rack1", freeEcSlot: 10,
},
{
info: &master_pb.DataNodeInfo{
Id: "dc2-rack2-node1",
DiskInfos: map[string]*master_pb.DiskInfo{
diskTypeKey: {Type: diskTypeKey, MaxVolumeCount: 10},
},
},
dc: "dc2", rack: "dc2-rack2", freeEcSlot: 10,
},
}
rp, _ := super_block.NewReplicaPlacementFromString("110")
rebalancer := NewProportionalECRebalancer(nodes, rp, diskType)
volumeId := needle.VolumeId(1)
moves, err := rebalancer.PlanMoves(volumeId, []*EcNode{nodes[0]})
if err != nil {
t.Fatalf("PlanMoves failed: %v", err)
}
t.Logf("Planned %d moves", len(moves))
for i, move := range moves {
t.Logf(" %d. %s", i+1, move.String())
}
// Verify moves to dc2
movedToDC2 := 0
for _, move := range moves {
if move.DestNode.dc == "dc2" {
movedToDC2++
}
}
if movedToDC2 == 0 {
t.Error("Expected some moves to dc2")
}
}
func TestCustomECConfigRebalancer(t *testing.T) {
diskType := types.HardDriveType
diskTypeKey := string(diskType)
// Test with custom 8+4 EC configuration
ecConfig, err := distribution.NewECConfig(8, 4)
if err != nil {
t.Fatalf("Failed to create EC config: %v", err)
}
// Build topology for 12 shards (8+4)
nodes := []*EcNode{
{
info: &master_pb.DataNodeInfo{
Id: "dc1-node1",
DiskInfos: map[string]*master_pb.DiskInfo{
diskTypeKey: {
Type: diskTypeKey,
MaxVolumeCount: 10,
EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{
{Id: 1, Collection: "test", EcIndexBits: 0x0FFF}, // 12 shards (bits 0-11)
},
},
},
},
dc: "dc1", rack: "dc1-rack1", freeEcSlot: 0,
},
{
info: &master_pb.DataNodeInfo{
Id: "dc2-node1",
DiskInfos: map[string]*master_pb.DiskInfo{
diskTypeKey: {Type: diskTypeKey, MaxVolumeCount: 10},
},
},
dc: "dc2", rack: "dc2-rack1", freeEcSlot: 10,
},
{
info: &master_pb.DataNodeInfo{
Id: "dc3-node1",
DiskInfos: map[string]*master_pb.DiskInfo{
diskTypeKey: {Type: diskTypeKey, MaxVolumeCount: 10},
},
},
dc: "dc3", rack: "dc3-rack1", freeEcSlot: 10,
},
}
rp, _ := super_block.NewReplicaPlacementFromString("200") // 3 DCs
rebalancer := NewProportionalECRebalancerWithConfig(nodes, rp, diskType, ecConfig)
volumeId := needle.VolumeId(1)
moves, err := rebalancer.PlanMoves(volumeId, []*EcNode{nodes[0]})
if err != nil {
t.Fatalf("PlanMoves failed: %v", err)
}
t.Logf("Custom 8+4 EC with 200 replication: planned %d moves", len(moves))
// Get the distribution summary
summary := GetDistributionSummaryWithConfig(rp, ecConfig)
t.Log(summary)
analysis := GetFaultToleranceAnalysisWithConfig(rp, ecConfig)
t.Log(analysis)
}
func TestGetDistributionSummaryShell(t *testing.T) {
rp, _ := super_block.NewReplicaPlacementFromString("110")
summary := GetDistributionSummary(rp)
t.Log(summary)
if len(summary) == 0 {
t.Error("Summary should not be empty")
}
analysis := GetFaultToleranceAnalysis(rp)
t.Log(analysis)
if len(analysis) == 0 {
t.Error("Analysis should not be empty")
}
}

293
weed/shell/ec_rebalance_slots_test.go

@ -0,0 +1,293 @@
package shell
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
// TestECRebalanceWithLimitedSlots tests that EC rebalance handles the scenario
// where there are limited free slots on volume servers.
//
// This is a regression test for the error:
//
// "no free ec shard slots. only 0 left"
//
// Scenario (from real usage):
// - 6 volume servers in 6 racks
// - Each server has max=10 volume slots
// - 7 volumes were EC encoded (7 × 14 = 98 EC shards)
// - All 14 shards per volume are on the original server (not yet distributed)
//
// Expected behavior:
// - The rebalance algorithm should distribute shards across servers
// - Even if perfect distribution isn't possible, it should do best-effort
// - Currently fails with "no free ec shard slots" because freeSlots calculation
//
// doesn't account for shards being moved (freed slots on source, used on target)
func TestECRebalanceWithLimitedSlots(t *testing.T) {
// Build a topology matching the problematic scenario:
// 6 servers, each with 2+ volumes worth of EC shards (all 14 shards per volume on same server)
topology := buildLimitedSlotsTopology()
// Collect EC nodes from the topology
ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topology, "", types.HardDriveType)
t.Logf("Topology summary:")
t.Logf(" Number of EC nodes: %d", len(ecNodes))
t.Logf(" Total free EC slots: %d", totalFreeEcSlots)
// Log per-node details
for _, node := range ecNodes {
shardCount := 0
for _, diskInfo := range node.info.DiskInfos {
for _, ecShard := range diskInfo.EcShardInfos {
shardCount += erasure_coding.ShardBits(ecShard.EcIndexBits).ShardIdCount()
}
}
t.Logf(" Node %s (rack %s): %d shards, %d free slots",
node.info.Id, node.rack, shardCount, node.freeEcSlot)
}
// Calculate total EC shards
totalEcShards := 0
for _, node := range ecNodes {
for _, diskInfo := range node.info.DiskInfos {
for _, ecShard := range diskInfo.EcShardInfos {
totalEcShards += erasure_coding.ShardBits(ecShard.EcIndexBits).ShardIdCount()
}
}
}
t.Logf(" Total EC shards: %d", totalEcShards)
// Document the issue:
// With 98 EC shards (7 volumes × 14 shards) on 6 servers with max=10 each,
// total capacity is 60 slots. But shards already occupy slots on their current servers.
//
// The current algorithm calculates free slots as:
// freeSlots = maxVolumeCount - volumeCount - ecShardCount
//
// If all shards are on their original servers:
// - Server A has 28 shards (2 volumes × 14) → may have negative free slots
// - This causes totalFreeEcSlots to be 0 or negative
//
// The EXPECTED improvement:
// - Rebalance should recognize that moving a shard FREES a slot on the source
// - The algorithm should work iteratively, moving shards one at a time
// - Even if starting with 0 free slots, moving one shard opens a slot
if totalFreeEcSlots < 1 {
// This is the current (buggy) behavior we're documenting
t.Logf("")
t.Logf("KNOWN ISSUE: totalFreeEcSlots = %d (< 1)", totalFreeEcSlots)
t.Logf("")
t.Logf("This triggers the error: 'no free ec shard slots. only %d left'", totalFreeEcSlots)
t.Logf("")
t.Logf("Analysis:")
t.Logf(" - %d EC shards across %d servers", totalEcShards, len(ecNodes))
t.Logf(" - Shards are concentrated on original servers (not distributed)")
t.Logf(" - Current slot calculation doesn't account for slots freed by moving shards")
t.Logf("")
t.Logf("Expected fix:")
t.Logf(" 1. Rebalance should work iteratively, moving one shard at a time")
t.Logf(" 2. Moving a shard from A to B: frees 1 slot on A, uses 1 slot on B")
t.Logf(" 3. The 'free slots' check should be per-move, not global")
t.Logf(" 4. Or: calculate 'redistributable slots' = total capacity - shards that must stay")
// For now, document this is a known issue - don't fail the test
// When the fix is implemented, this test should be updated to verify the fix works
return
}
// If we get here, the issue might have been fixed
t.Logf("totalFreeEcSlots = %d, rebalance should be possible", totalFreeEcSlots)
}
// TestECRebalanceZeroFreeSlots tests the specific scenario where
// the topology appears to have free slots but rebalance fails.
//
// This can happen when the VolumeCount in the topology includes the original
// volumes that were EC-encoded, making the free slot calculation incorrect.
func TestECRebalanceZeroFreeSlots(t *testing.T) {
// Build a topology where volumes were NOT deleted after EC encoding
// (VolumeCount still reflects the original volumes)
topology := buildZeroFreeSlotTopology()
ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topology, "", types.HardDriveType)
t.Logf("Zero free slots scenario:")
for _, node := range ecNodes {
shardCount := 0
for _, diskInfo := range node.info.DiskInfos {
for _, ecShard := range diskInfo.EcShardInfos {
shardCount += erasure_coding.ShardBits(ecShard.EcIndexBits).ShardIdCount()
}
}
t.Logf(" Node %s: %d shards, %d free slots, volumeCount=%d, max=%d",
node.info.Id, shardCount, node.freeEcSlot,
node.info.DiskInfos[string(types.HardDriveType)].VolumeCount,
node.info.DiskInfos[string(types.HardDriveType)].MaxVolumeCount)
}
t.Logf(" Total free slots: %d", totalFreeEcSlots)
if totalFreeEcSlots == 0 {
t.Logf("")
t.Logf("SCENARIO REPRODUCED: totalFreeEcSlots = 0")
t.Logf("This would trigger: 'no free ec shard slots. only 0 left'")
}
}
// buildZeroFreeSlotTopology creates a topology where rebalance will fail
// because servers are at capacity (volumeCount equals maxVolumeCount)
func buildZeroFreeSlotTopology() *master_pb.TopologyInfo {
diskTypeKey := string(types.HardDriveType)
// Each server has max=10, volumeCount=10 (full capacity)
// Free capacity = (10-10) * 10 = 0 per server
// This will trigger "no free ec shard slots" error
return &master_pb.TopologyInfo{
Id: "test_zero_free_slots",
DataCenterInfos: []*master_pb.DataCenterInfo{
{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{
{
Id: "rack0",
DataNodeInfos: []*master_pb.DataNodeInfo{
{
Id: "127.0.0.1:8080",
DiskInfos: map[string]*master_pb.DiskInfo{
diskTypeKey: {
Type: diskTypeKey,
MaxVolumeCount: 10,
VolumeCount: 10, // At full capacity
EcShardInfos: buildEcShards([]uint32{3, 4}),
},
},
},
},
},
{
Id: "rack1",
DataNodeInfos: []*master_pb.DataNodeInfo{
{
Id: "127.0.0.1:8081",
DiskInfos: map[string]*master_pb.DiskInfo{
diskTypeKey: {
Type: diskTypeKey,
MaxVolumeCount: 10,
VolumeCount: 10,
EcShardInfos: buildEcShards([]uint32{1, 7}),
},
},
},
},
},
{
Id: "rack2",
DataNodeInfos: []*master_pb.DataNodeInfo{
{
Id: "127.0.0.1:8082",
DiskInfos: map[string]*master_pb.DiskInfo{
diskTypeKey: {
Type: diskTypeKey,
MaxVolumeCount: 10,
VolumeCount: 10,
EcShardInfos: buildEcShards([]uint32{2}),
},
},
},
},
},
{
Id: "rack3",
DataNodeInfos: []*master_pb.DataNodeInfo{
{
Id: "127.0.0.1:8083",
DiskInfos: map[string]*master_pb.DiskInfo{
diskTypeKey: {
Type: diskTypeKey,
MaxVolumeCount: 10,
VolumeCount: 10,
EcShardInfos: buildEcShards([]uint32{5, 6}),
},
},
},
},
},
},
},
},
}
}
func buildEcShards(volumeIds []uint32) []*master_pb.VolumeEcShardInformationMessage {
var shards []*master_pb.VolumeEcShardInformationMessage
for _, vid := range volumeIds {
allShardBits := erasure_coding.ShardBits(0)
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
allShardBits = allShardBits.AddShardId(erasure_coding.ShardId(i))
}
shards = append(shards, &master_pb.VolumeEcShardInformationMessage{
Id: vid,
Collection: "ectest",
EcIndexBits: uint32(allShardBits),
})
}
return shards
}
// buildLimitedSlotsTopology creates a topology matching the problematic scenario:
// - 6 servers in 6 racks
// - Each server has max=10 volume slots
// - 7 volumes were EC encoded, shards distributed as follows:
// - rack0 (8080): volumes 3,4 → 28 shards
// - rack1 (8081): volumes 1,7 → 28 shards
// - rack2 (8082): volume 2 → 14 shards
// - rack3 (8083): volumes 5,6 → 28 shards
// - rack4 (8084): (no volumes originally)
// - rack5 (8085): (no volumes originally)
func buildLimitedSlotsTopology() *master_pb.TopologyInfo {
return &master_pb.TopologyInfo{
Id: "test_limited_slots",
DataCenterInfos: []*master_pb.DataCenterInfo{
{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{
buildRackWithEcShards("rack0", "127.0.0.1:8080", 10, []uint32{3, 4}),
buildRackWithEcShards("rack1", "127.0.0.1:8081", 10, []uint32{1, 7}),
buildRackWithEcShards("rack2", "127.0.0.1:8082", 10, []uint32{2}),
buildRackWithEcShards("rack3", "127.0.0.1:8083", 10, []uint32{5, 6}),
buildRackWithEcShards("rack4", "127.0.0.1:8084", 10, []uint32{}),
buildRackWithEcShards("rack5", "127.0.0.1:8085", 10, []uint32{}),
},
},
},
}
}
// buildRackWithEcShards creates a rack with one data node containing EC shards
// for the specified volume IDs (all 14 shards per volume)
func buildRackWithEcShards(rackId, nodeId string, maxVolumes int64, volumeIds []uint32) *master_pb.RackInfo {
// Note: types.HardDriveType is "" (empty string), so we use "" as the key
diskTypeKey := string(types.HardDriveType)
return &master_pb.RackInfo{
Id: rackId,
DataNodeInfos: []*master_pb.DataNodeInfo{
{
Id: nodeId,
DiskInfos: map[string]*master_pb.DiskInfo{
diskTypeKey: {
Type: diskTypeKey,
MaxVolumeCount: maxVolumes,
VolumeCount: int64(len(volumeIds)), // Original volumes still counted
EcShardInfos: buildEcShards(volumeIds),
},
},
},
},
}
}

209
weed/storage/erasure_coding/distribution/README.md

@ -0,0 +1,209 @@
# EC Distribution Package
This package provides erasure coding (EC) shard distribution algorithms that are:
- **Configurable**: Works with any EC ratio (e.g., 10+4, 8+4, 6+3)
- **Reusable**: Used by shell commands, worker tasks, and seaweed-enterprise
- **Topology-aware**: Distributes shards across data centers, racks, and nodes proportionally
## Usage
### Basic Usage with Default 10+4 EC
```go
import (
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/distribution"
)
// Parse replication policy
rep, _ := distribution.NewReplicationConfigFromString("110")
// Use default 10+4 EC configuration
ec := distribution.DefaultECConfig()
// Calculate distribution plan
dist := distribution.CalculateDistribution(ec, rep)
fmt.Println(dist.Summary())
// Output:
// EC Configuration: 10+4 (total: 14, can lose: 4)
// Replication: replication=110 (DCs:2, Racks/DC:2, Nodes/Rack:1)
// Distribution Plan:
// Data Centers: 2 (target 7 shards each, max 9)
// Racks per DC: 2 (target 4 shards each, max 6)
// Nodes per Rack: 1 (target 4 shards each, max 6)
```
### Custom EC Ratios (seaweed-enterprise)
```go
// Create custom 8+4 EC configuration
ec, err := distribution.NewECConfig(8, 4)
if err != nil {
log.Fatal(err)
}
rep, _ := distribution.NewReplicationConfigFromString("200")
dist := distribution.CalculateDistribution(ec, rep)
// Check fault tolerance
fmt.Println(dist.FaultToleranceAnalysis())
// Output:
// Fault Tolerance Analysis for 8+4:
// DC Failure: SURVIVABLE ✓
// - Losing one DC loses ~4 shards
// - Remaining: 8 shards (need 8)
```
### Planning Shard Moves
```go
// Build topology analysis
analysis := distribution.NewTopologyAnalysis()
// Add nodes and their shard locations
for _, node := range nodes {
analysis.AddNode(&distribution.TopologyNode{
NodeID: node.ID,
DataCenter: node.DC,
Rack: node.Rack,
FreeSlots: node.FreeSlots,
})
for _, shardID := range node.ShardIDs {
analysis.AddShardLocation(distribution.ShardLocation{
ShardID: shardID,
NodeID: node.ID,
DataCenter: node.DC,
Rack: node.Rack,
})
}
}
analysis.Finalize()
// Create rebalancer and plan moves
rebalancer := distribution.NewRebalancer(ec, rep)
plan, err := rebalancer.PlanRebalance(analysis)
for _, move := range plan.Moves {
fmt.Printf("Move shard %d from %s to %s\n",
move.ShardID, move.SourceNode.NodeID, move.DestNode.NodeID)
}
```
## Algorithm
### Proportional Distribution
The replication policy `XYZ` is interpreted as a ratio:
| Replication | DCs | Racks/DC | Nodes/Rack | 14 Shards Distribution |
|-------------|-----|----------|------------|------------------------|
| `000` | 1 | 1 | 1 | All in one place |
| `001` | 1 | 1 | 2 | 7 per node |
| `010` | 1 | 2 | 1 | 7 per rack |
| `100` | 2 | 1 | 1 | 7 per DC |
| `110` | 2 | 2 | 1 | 7/DC, 4/rack |
| `200` | 3 | 1 | 1 | 5 per DC |
### Rebalancing Process
1. **DC-level balancing**: Move shards to achieve target shards per DC
2. **Rack-level balancing**: Within each DC, balance across racks
3. **Node-level balancing**: Within each rack, balance across nodes
### Shard Priority: Data First, Parity Moves First
When rebalancing, the algorithm prioritizes keeping data shards spread out:
- **Data shards (0 to DataShards-1)**: Serve read requests directly
- **Parity shards (DataShards to TotalShards-1)**: Only used for reconstruction
**Rebalancing Strategy**:
- When moving shards FROM an overloaded node, **parity shards are moved first**
- This keeps data shards in place on well-distributed nodes
- Result: Data shards remain spread out for optimal read performance
```go
// Check shard type
if ec.IsDataShard(shardID) {
// Shard serves read requests
}
if ec.IsParityShard(shardID) {
// Shard only used for reconstruction
}
// Sort shards for placement (data first for initial distribution)
sorted := ec.SortShardsDataFirst(shards)
// Sort shards for rebalancing (parity first to move them away)
sorted := ec.SortShardsParityFirst(shards)
```
### Fault Tolerance
The package provides fault tolerance analysis:
- **DC Failure**: Can the data survive complete DC loss?
- **Rack Failure**: Can the data survive complete rack loss?
- **Node Failure**: Can the data survive single node loss?
For example, with 10+4 EC (can lose 4 shards):
- Need 4+ DCs for DC-level fault tolerance
- Need 4+ racks for rack-level fault tolerance
- Usually survivable at node level
## API Reference
### Types
- `ECConfig`: EC configuration (data shards, parity shards)
- `ReplicationConfig`: Parsed replication policy
- `ECDistribution`: Calculated distribution plan
- `TopologyAnalysis`: Current shard distribution analysis
- `Rebalancer`: Plans shard moves
- `RebalancePlan`: List of planned moves
- `ShardMove`: Single shard move operation
### Key Functions
- `NewECConfig(data, parity int)`: Create EC configuration
- `DefaultECConfig()`: Returns 10+4 configuration
- `CalculateDistribution(ec, rep)`: Calculate distribution plan
- `NewRebalancer(ec, rep)`: Create rebalancer
- `PlanRebalance(analysis)`: Generate rebalancing plan
## Integration
### Shell Commands
The shell package wraps this distribution package for `ec.balance`:
```go
import "github.com/seaweedfs/seaweedfs/weed/shell"
rebalancer := shell.NewProportionalECRebalancer(nodes, rp, diskType)
moves, _ := rebalancer.PlanMoves(volumeId, locations)
```
### Worker Tasks
Worker tasks can use the distribution package directly:
```go
import "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/distribution"
ec := distribution.ECConfig{DataShards: 8, ParityShards: 4}
rep := distribution.NewReplicationConfig(rp)
dist := distribution.CalculateDistribution(ec, rep)
```
### seaweed-enterprise
Enterprise features can provide custom EC configurations:
```go
// Custom EC ratio from license/config
ec, _ := distribution.NewECConfig(customData, customParity)
rebalancer := distribution.NewRebalancer(ec, rep)
```

241
weed/storage/erasure_coding/distribution/analysis.go

@ -0,0 +1,241 @@
package distribution
import (
"fmt"
"slices"
)
// ShardLocation represents where a shard is located in the topology
type ShardLocation struct {
ShardID int
NodeID string
DataCenter string
Rack string
}
// TopologyNode represents a node in the topology that can hold EC shards
type TopologyNode struct {
NodeID string
DataCenter string
Rack string
FreeSlots int // Available slots for new shards
ShardIDs []int // Shard IDs currently on this node for a specific volume
TotalShards int // Total shards on this node (for all volumes)
}
// TopologyAnalysis holds the current shard distribution analysis for a volume
type TopologyAnalysis struct {
// Shard counts at each level
ShardsByDC map[string]int
ShardsByRack map[string]int
ShardsByNode map[string]int
// Detailed shard locations
DCToShards map[string][]int // DC -> list of shard IDs
RackToShards map[string][]int // Rack -> list of shard IDs
NodeToShards map[string][]int // NodeID -> list of shard IDs
// Topology structure
DCToRacks map[string][]string // DC -> list of rack IDs
RackToNodes map[string][]*TopologyNode // Rack -> list of nodes
AllNodes map[string]*TopologyNode // NodeID -> node info
// Statistics
TotalShards int
TotalNodes int
TotalRacks int
TotalDCs int
}
// NewTopologyAnalysis creates a new empty analysis
func NewTopologyAnalysis() *TopologyAnalysis {
return &TopologyAnalysis{
ShardsByDC: make(map[string]int),
ShardsByRack: make(map[string]int),
ShardsByNode: make(map[string]int),
DCToShards: make(map[string][]int),
RackToShards: make(map[string][]int),
NodeToShards: make(map[string][]int),
DCToRacks: make(map[string][]string),
RackToNodes: make(map[string][]*TopologyNode),
AllNodes: make(map[string]*TopologyNode),
}
}
// AddShardLocation adds a shard location to the analysis
func (a *TopologyAnalysis) AddShardLocation(loc ShardLocation) {
// Update counts
a.ShardsByDC[loc.DataCenter]++
a.ShardsByRack[loc.Rack]++
a.ShardsByNode[loc.NodeID]++
// Update shard lists
a.DCToShards[loc.DataCenter] = append(a.DCToShards[loc.DataCenter], loc.ShardID)
a.RackToShards[loc.Rack] = append(a.RackToShards[loc.Rack], loc.ShardID)
a.NodeToShards[loc.NodeID] = append(a.NodeToShards[loc.NodeID], loc.ShardID)
a.TotalShards++
}
// AddNode adds a node to the topology (even if it has no shards)
func (a *TopologyAnalysis) AddNode(node *TopologyNode) {
if _, exists := a.AllNodes[node.NodeID]; exists {
return // Already added
}
a.AllNodes[node.NodeID] = node
a.TotalNodes++
// Update topology structure
if !slices.Contains(a.DCToRacks[node.DataCenter], node.Rack) {
a.DCToRacks[node.DataCenter] = append(a.DCToRacks[node.DataCenter], node.Rack)
}
a.RackToNodes[node.Rack] = append(a.RackToNodes[node.Rack], node)
// Update counts
if _, exists := a.ShardsByDC[node.DataCenter]; !exists {
a.TotalDCs++
}
if _, exists := a.ShardsByRack[node.Rack]; !exists {
a.TotalRacks++
}
}
// Finalize computes final statistics after all data is added
func (a *TopologyAnalysis) Finalize() {
// Ensure we have accurate DC and rack counts
dcSet := make(map[string]bool)
rackSet := make(map[string]bool)
for _, node := range a.AllNodes {
dcSet[node.DataCenter] = true
rackSet[node.Rack] = true
}
a.TotalDCs = len(dcSet)
a.TotalRacks = len(rackSet)
a.TotalNodes = len(a.AllNodes)
}
// String returns a summary of the analysis
func (a *TopologyAnalysis) String() string {
return fmt.Sprintf("TopologyAnalysis{shards:%d, nodes:%d, racks:%d, dcs:%d}",
a.TotalShards, a.TotalNodes, a.TotalRacks, a.TotalDCs)
}
// DetailedString returns a detailed multi-line summary
func (a *TopologyAnalysis) DetailedString() string {
s := fmt.Sprintf("Topology Analysis:\n")
s += fmt.Sprintf(" Total Shards: %d\n", a.TotalShards)
s += fmt.Sprintf(" Data Centers: %d\n", a.TotalDCs)
for dc, count := range a.ShardsByDC {
s += fmt.Sprintf(" %s: %d shards\n", dc, count)
}
s += fmt.Sprintf(" Racks: %d\n", a.TotalRacks)
for rack, count := range a.ShardsByRack {
s += fmt.Sprintf(" %s: %d shards\n", rack, count)
}
s += fmt.Sprintf(" Nodes: %d\n", a.TotalNodes)
for nodeID, count := range a.ShardsByNode {
if count > 0 {
s += fmt.Sprintf(" %s: %d shards\n", nodeID, count)
}
}
return s
}
// TopologyExcess represents a topology level (DC/rack/node) with excess shards
type TopologyExcess struct {
ID string // DC/rack/node ID
Level string // "dc", "rack", or "node"
Excess int // Number of excess shards (above target)
Shards []int // Shard IDs at this level
Nodes []*TopologyNode // Nodes at this level (for finding sources)
}
// CalculateDCExcess returns DCs with more shards than the target
func CalculateDCExcess(analysis *TopologyAnalysis, dist *ECDistribution) []TopologyExcess {
var excess []TopologyExcess
for dc, count := range analysis.ShardsByDC {
if count > dist.TargetShardsPerDC {
// Collect nodes in this DC
var nodes []*TopologyNode
for _, rack := range analysis.DCToRacks[dc] {
nodes = append(nodes, analysis.RackToNodes[rack]...)
}
excess = append(excess, TopologyExcess{
ID: dc,
Level: "dc",
Excess: count - dist.TargetShardsPerDC,
Shards: analysis.DCToShards[dc],
Nodes: nodes,
})
}
}
// Sort by excess (most excess first)
slices.SortFunc(excess, func(a, b TopologyExcess) int {
return b.Excess - a.Excess
})
return excess
}
// CalculateRackExcess returns racks with more shards than the target (within a DC)
func CalculateRackExcess(analysis *TopologyAnalysis, dc string, targetPerRack int) []TopologyExcess {
var excess []TopologyExcess
for _, rack := range analysis.DCToRacks[dc] {
count := analysis.ShardsByRack[rack]
if count > targetPerRack {
excess = append(excess, TopologyExcess{
ID: rack,
Level: "rack",
Excess: count - targetPerRack,
Shards: analysis.RackToShards[rack],
Nodes: analysis.RackToNodes[rack],
})
}
}
slices.SortFunc(excess, func(a, b TopologyExcess) int {
return b.Excess - a.Excess
})
return excess
}
// CalculateUnderservedDCs returns DCs that have fewer shards than target
func CalculateUnderservedDCs(analysis *TopologyAnalysis, dist *ECDistribution) []string {
var underserved []string
// Check existing DCs
for dc, count := range analysis.ShardsByDC {
if count < dist.TargetShardsPerDC {
underserved = append(underserved, dc)
}
}
// Check DCs with nodes but no shards
for dc := range analysis.DCToRacks {
if _, exists := analysis.ShardsByDC[dc]; !exists {
underserved = append(underserved, dc)
}
}
return underserved
}
// CalculateUnderservedRacks returns racks that have fewer shards than target
func CalculateUnderservedRacks(analysis *TopologyAnalysis, dc string, targetPerRack int) []string {
var underserved []string
for _, rack := range analysis.DCToRacks[dc] {
count := analysis.ShardsByRack[rack]
if count < targetPerRack {
underserved = append(underserved, rack)
}
}
return underserved
}

171
weed/storage/erasure_coding/distribution/config.go

@ -0,0 +1,171 @@
// Package distribution provides EC shard distribution algorithms with configurable EC ratios.
package distribution
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
)
// ECConfig holds erasure coding configuration parameters.
// This replaces hard-coded constants like DataShardsCount=10, ParityShardsCount=4.
type ECConfig struct {
DataShards int // Number of data shards (e.g., 10)
ParityShards int // Number of parity shards (e.g., 4)
}
// DefaultECConfig returns the standard 10+4 EC configuration
func DefaultECConfig() ECConfig {
return ECConfig{
DataShards: 10,
ParityShards: 4,
}
}
// NewECConfig creates a new EC configuration with validation
func NewECConfig(dataShards, parityShards int) (ECConfig, error) {
if dataShards <= 0 {
return ECConfig{}, fmt.Errorf("dataShards must be positive, got %d", dataShards)
}
if parityShards <= 0 {
return ECConfig{}, fmt.Errorf("parityShards must be positive, got %d", parityShards)
}
if dataShards+parityShards > 32 {
return ECConfig{}, fmt.Errorf("total shards (%d+%d=%d) exceeds maximum of 32",
dataShards, parityShards, dataShards+parityShards)
}
return ECConfig{
DataShards: dataShards,
ParityShards: parityShards,
}, nil
}
// TotalShards returns the total number of shards (data + parity)
func (c ECConfig) TotalShards() int {
return c.DataShards + c.ParityShards
}
// MaxTolerableLoss returns the maximum number of shards that can be lost
// while still being able to reconstruct the data
func (c ECConfig) MaxTolerableLoss() int {
return c.ParityShards
}
// MinShardsForReconstruction returns the minimum number of shards needed
// to reconstruct the original data
func (c ECConfig) MinShardsForReconstruction() int {
return c.DataShards
}
// String returns a human-readable representation
func (c ECConfig) String() string {
return fmt.Sprintf("%d+%d (total: %d, can lose: %d)",
c.DataShards, c.ParityShards, c.TotalShards(), c.MaxTolerableLoss())
}
// IsDataShard returns true if the shard ID is a data shard (0 to DataShards-1)
func (c ECConfig) IsDataShard(shardID int) bool {
return shardID >= 0 && shardID < c.DataShards
}
// IsParityShard returns true if the shard ID is a parity shard (DataShards to TotalShards-1)
func (c ECConfig) IsParityShard(shardID int) bool {
return shardID >= c.DataShards && shardID < c.TotalShards()
}
// SortShardsDataFirst returns a copy of shards sorted with data shards first.
// This is useful for initial placement where data shards should be spread out first.
func (c ECConfig) SortShardsDataFirst(shards []int) []int {
result := make([]int, len(shards))
copy(result, shards)
// Partition: data shards first, then parity shards
dataIdx := 0
parityIdx := len(result) - 1
sorted := make([]int, len(result))
for _, s := range result {
if c.IsDataShard(s) {
sorted[dataIdx] = s
dataIdx++
} else {
sorted[parityIdx] = s
parityIdx--
}
}
return sorted
}
// SortShardsParityFirst returns a copy of shards sorted with parity shards first.
// This is useful for rebalancing where we prefer to move parity shards.
func (c ECConfig) SortShardsParityFirst(shards []int) []int {
result := make([]int, len(shards))
copy(result, shards)
// Partition: parity shards first, then data shards
parityIdx := 0
dataIdx := len(result) - 1
sorted := make([]int, len(result))
for _, s := range result {
if c.IsParityShard(s) {
sorted[parityIdx] = s
parityIdx++
} else {
sorted[dataIdx] = s
dataIdx--
}
}
return sorted
}
// ReplicationConfig holds the parsed replication policy
type ReplicationConfig struct {
MinDataCenters int // X+1 from XYZ replication (minimum DCs to use)
MinRacksPerDC int // Y+1 from XYZ replication (minimum racks per DC)
MinNodesPerRack int // Z+1 from XYZ replication (minimum nodes per rack)
// Original replication string (for logging/debugging)
Original string
}
// NewReplicationConfig creates a ReplicationConfig from a ReplicaPlacement
func NewReplicationConfig(rp *super_block.ReplicaPlacement) ReplicationConfig {
if rp == nil {
return ReplicationConfig{
MinDataCenters: 1,
MinRacksPerDC: 1,
MinNodesPerRack: 1,
Original: "000",
}
}
return ReplicationConfig{
MinDataCenters: rp.DiffDataCenterCount + 1,
MinRacksPerDC: rp.DiffRackCount + 1,
MinNodesPerRack: rp.SameRackCount + 1,
Original: rp.String(),
}
}
// NewReplicationConfigFromString creates a ReplicationConfig from a replication string
func NewReplicationConfigFromString(replication string) (ReplicationConfig, error) {
rp, err := super_block.NewReplicaPlacementFromString(replication)
if err != nil {
return ReplicationConfig{}, err
}
return NewReplicationConfig(rp), nil
}
// TotalPlacementSlots returns the minimum number of unique placement locations
// based on the replication policy
func (r ReplicationConfig) TotalPlacementSlots() int {
return r.MinDataCenters * r.MinRacksPerDC * r.MinNodesPerRack
}
// String returns a human-readable representation
func (r ReplicationConfig) String() string {
return fmt.Sprintf("replication=%s (DCs:%d, Racks/DC:%d, Nodes/Rack:%d)",
r.Original, r.MinDataCenters, r.MinRacksPerDC, r.MinNodesPerRack)
}

161
weed/storage/erasure_coding/distribution/distribution.go

@ -0,0 +1,161 @@
package distribution
import (
"fmt"
)
// ECDistribution represents the target distribution of EC shards
// based on EC configuration and replication policy.
type ECDistribution struct {
// EC configuration
ECConfig ECConfig
// Replication configuration
ReplicationConfig ReplicationConfig
// Target shard counts per topology level (balanced distribution)
TargetShardsPerDC int
TargetShardsPerRack int
TargetShardsPerNode int
// Maximum shard counts per topology level (fault tolerance limits)
// These prevent any single failure domain from having too many shards
MaxShardsPerDC int
MaxShardsPerRack int
MaxShardsPerNode int
}
// CalculateDistribution computes the target EC shard distribution based on
// EC configuration and replication policy.
//
// The algorithm:
// 1. Uses replication policy to determine minimum topology spread
// 2. Calculates target shards per level (evenly distributed)
// 3. Calculates max shards per level (for fault tolerance)
func CalculateDistribution(ec ECConfig, rep ReplicationConfig) *ECDistribution {
totalShards := ec.TotalShards()
// Target distribution (balanced, rounded up to ensure all shards placed)
targetShardsPerDC := ceilDivide(totalShards, rep.MinDataCenters)
targetShardsPerRack := ceilDivide(targetShardsPerDC, rep.MinRacksPerDC)
targetShardsPerNode := ceilDivide(targetShardsPerRack, rep.MinNodesPerRack)
// Maximum limits for fault tolerance
// The key constraint: losing one failure domain shouldn't lose more than parityShards
// So max shards per domain = totalShards - parityShards + tolerance
// We add small tolerance (+2) to allow for imbalanced topologies
faultToleranceLimit := totalShards - ec.ParityShards + 1
maxShardsPerDC := min(faultToleranceLimit, targetShardsPerDC+2)
maxShardsPerRack := min(faultToleranceLimit, targetShardsPerRack+2)
maxShardsPerNode := min(faultToleranceLimit, targetShardsPerNode+2)
return &ECDistribution{
ECConfig: ec,
ReplicationConfig: rep,
TargetShardsPerDC: targetShardsPerDC,
TargetShardsPerRack: targetShardsPerRack,
TargetShardsPerNode: targetShardsPerNode,
MaxShardsPerDC: maxShardsPerDC,
MaxShardsPerRack: maxShardsPerRack,
MaxShardsPerNode: maxShardsPerNode,
}
}
// String returns a human-readable description of the distribution
func (d *ECDistribution) String() string {
return fmt.Sprintf(
"ECDistribution{EC:%s, DCs:%d (target:%d/max:%d), Racks/DC:%d (target:%d/max:%d), Nodes/Rack:%d (target:%d/max:%d)}",
d.ECConfig.String(),
d.ReplicationConfig.MinDataCenters, d.TargetShardsPerDC, d.MaxShardsPerDC,
d.ReplicationConfig.MinRacksPerDC, d.TargetShardsPerRack, d.MaxShardsPerRack,
d.ReplicationConfig.MinNodesPerRack, d.TargetShardsPerNode, d.MaxShardsPerNode,
)
}
// Summary returns a multi-line summary of the distribution plan
func (d *ECDistribution) Summary() string {
summary := fmt.Sprintf("EC Configuration: %s\n", d.ECConfig.String())
summary += fmt.Sprintf("Replication: %s\n", d.ReplicationConfig.String())
summary += fmt.Sprintf("Distribution Plan:\n")
summary += fmt.Sprintf(" Data Centers: %d (target %d shards each, max %d)\n",
d.ReplicationConfig.MinDataCenters, d.TargetShardsPerDC, d.MaxShardsPerDC)
summary += fmt.Sprintf(" Racks per DC: %d (target %d shards each, max %d)\n",
d.ReplicationConfig.MinRacksPerDC, d.TargetShardsPerRack, d.MaxShardsPerRack)
summary += fmt.Sprintf(" Nodes per Rack: %d (target %d shards each, max %d)\n",
d.ReplicationConfig.MinNodesPerRack, d.TargetShardsPerNode, d.MaxShardsPerNode)
return summary
}
// CanSurviveDCFailure returns true if the distribution can survive
// complete loss of one data center
func (d *ECDistribution) CanSurviveDCFailure() bool {
// After losing one DC with max shards, check if remaining shards are enough
remainingAfterDCLoss := d.ECConfig.TotalShards() - d.TargetShardsPerDC
return remainingAfterDCLoss >= d.ECConfig.MinShardsForReconstruction()
}
// CanSurviveRackFailure returns true if the distribution can survive
// complete loss of one rack
func (d *ECDistribution) CanSurviveRackFailure() bool {
remainingAfterRackLoss := d.ECConfig.TotalShards() - d.TargetShardsPerRack
return remainingAfterRackLoss >= d.ECConfig.MinShardsForReconstruction()
}
// MinDCsForDCFaultTolerance calculates the minimum number of DCs needed
// to survive complete DC failure with this EC configuration
func (d *ECDistribution) MinDCsForDCFaultTolerance() int {
// To survive DC failure, max shards per DC = parityShards
maxShardsPerDC := d.ECConfig.MaxTolerableLoss()
if maxShardsPerDC == 0 {
return d.ECConfig.TotalShards() // Would need one DC per shard
}
return ceilDivide(d.ECConfig.TotalShards(), maxShardsPerDC)
}
// FaultToleranceAnalysis returns a detailed analysis of fault tolerance
func (d *ECDistribution) FaultToleranceAnalysis() string {
analysis := fmt.Sprintf("Fault Tolerance Analysis for %s:\n", d.ECConfig.String())
// DC failure
dcSurvive := d.CanSurviveDCFailure()
shardsAfterDC := d.ECConfig.TotalShards() - d.TargetShardsPerDC
analysis += fmt.Sprintf(" DC Failure: %s\n", boolToResult(dcSurvive))
analysis += fmt.Sprintf(" - Losing one DC loses ~%d shards\n", d.TargetShardsPerDC)
analysis += fmt.Sprintf(" - Remaining: %d shards (need %d)\n", shardsAfterDC, d.ECConfig.DataShards)
if !dcSurvive {
analysis += fmt.Sprintf(" - Need at least %d DCs for DC fault tolerance\n", d.MinDCsForDCFaultTolerance())
}
// Rack failure
rackSurvive := d.CanSurviveRackFailure()
shardsAfterRack := d.ECConfig.TotalShards() - d.TargetShardsPerRack
analysis += fmt.Sprintf(" Rack Failure: %s\n", boolToResult(rackSurvive))
analysis += fmt.Sprintf(" - Losing one rack loses ~%d shards\n", d.TargetShardsPerRack)
analysis += fmt.Sprintf(" - Remaining: %d shards (need %d)\n", shardsAfterRack, d.ECConfig.DataShards)
// Node failure (usually survivable)
shardsAfterNode := d.ECConfig.TotalShards() - d.TargetShardsPerNode
nodeSurvive := shardsAfterNode >= d.ECConfig.DataShards
analysis += fmt.Sprintf(" Node Failure: %s\n", boolToResult(nodeSurvive))
analysis += fmt.Sprintf(" - Losing one node loses ~%d shards\n", d.TargetShardsPerNode)
analysis += fmt.Sprintf(" - Remaining: %d shards (need %d)\n", shardsAfterNode, d.ECConfig.DataShards)
return analysis
}
func boolToResult(b bool) string {
if b {
return "SURVIVABLE ✓"
}
return "NOT SURVIVABLE ✗"
}
// ceilDivide performs ceiling division
func ceilDivide(a, b int) int {
if b <= 0 {
return a
}
return (a + b - 1) / b
}

565
weed/storage/erasure_coding/distribution/distribution_test.go

@ -0,0 +1,565 @@
package distribution
import (
"testing"
)
func TestNewECConfig(t *testing.T) {
tests := []struct {
name string
dataShards int
parityShards int
wantErr bool
}{
{"valid 10+4", 10, 4, false},
{"valid 8+4", 8, 4, false},
{"valid 6+3", 6, 3, false},
{"valid 4+2", 4, 2, false},
{"invalid data=0", 0, 4, true},
{"invalid parity=0", 10, 0, true},
{"invalid total>32", 20, 15, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config, err := NewECConfig(tt.dataShards, tt.parityShards)
if (err != nil) != tt.wantErr {
t.Errorf("NewECConfig() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !tt.wantErr {
if config.DataShards != tt.dataShards {
t.Errorf("DataShards = %d, want %d", config.DataShards, tt.dataShards)
}
if config.ParityShards != tt.parityShards {
t.Errorf("ParityShards = %d, want %d", config.ParityShards, tt.parityShards)
}
if config.TotalShards() != tt.dataShards+tt.parityShards {
t.Errorf("TotalShards() = %d, want %d", config.TotalShards(), tt.dataShards+tt.parityShards)
}
}
})
}
}
func TestCalculateDistribution(t *testing.T) {
tests := []struct {
name string
ecConfig ECConfig
replication string
expectedMinDCs int
expectedMinRacksPerDC int
expectedMinNodesPerRack int
expectedTargetPerDC int
expectedTargetPerRack int
expectedTargetPerNode int
}{
{
name: "10+4 with 000",
ecConfig: DefaultECConfig(),
replication: "000",
expectedMinDCs: 1,
expectedMinRacksPerDC: 1,
expectedMinNodesPerRack: 1,
expectedTargetPerDC: 14,
expectedTargetPerRack: 14,
expectedTargetPerNode: 14,
},
{
name: "10+4 with 100",
ecConfig: DefaultECConfig(),
replication: "100",
expectedMinDCs: 2,
expectedMinRacksPerDC: 1,
expectedMinNodesPerRack: 1,
expectedTargetPerDC: 7,
expectedTargetPerRack: 7,
expectedTargetPerNode: 7,
},
{
name: "10+4 with 110",
ecConfig: DefaultECConfig(),
replication: "110",
expectedMinDCs: 2,
expectedMinRacksPerDC: 2,
expectedMinNodesPerRack: 1,
expectedTargetPerDC: 7,
expectedTargetPerRack: 4,
expectedTargetPerNode: 4,
},
{
name: "10+4 with 200",
ecConfig: DefaultECConfig(),
replication: "200",
expectedMinDCs: 3,
expectedMinRacksPerDC: 1,
expectedMinNodesPerRack: 1,
expectedTargetPerDC: 5,
expectedTargetPerRack: 5,
expectedTargetPerNode: 5,
},
{
name: "8+4 with 110",
ecConfig: ECConfig{
DataShards: 8,
ParityShards: 4,
},
replication: "110",
expectedMinDCs: 2,
expectedMinRacksPerDC: 2,
expectedMinNodesPerRack: 1,
expectedTargetPerDC: 6, // 12/2 = 6
expectedTargetPerRack: 3, // 6/2 = 3
expectedTargetPerNode: 3,
},
{
name: "6+3 with 100",
ecConfig: ECConfig{
DataShards: 6,
ParityShards: 3,
},
replication: "100",
expectedMinDCs: 2,
expectedMinRacksPerDC: 1,
expectedMinNodesPerRack: 1,
expectedTargetPerDC: 5, // ceil(9/2) = 5
expectedTargetPerRack: 5,
expectedTargetPerNode: 5,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rep, err := NewReplicationConfigFromString(tt.replication)
if err != nil {
t.Fatalf("Failed to parse replication %s: %v", tt.replication, err)
}
dist := CalculateDistribution(tt.ecConfig, rep)
if dist.ReplicationConfig.MinDataCenters != tt.expectedMinDCs {
t.Errorf("MinDataCenters = %d, want %d", dist.ReplicationConfig.MinDataCenters, tt.expectedMinDCs)
}
if dist.ReplicationConfig.MinRacksPerDC != tt.expectedMinRacksPerDC {
t.Errorf("MinRacksPerDC = %d, want %d", dist.ReplicationConfig.MinRacksPerDC, tt.expectedMinRacksPerDC)
}
if dist.ReplicationConfig.MinNodesPerRack != tt.expectedMinNodesPerRack {
t.Errorf("MinNodesPerRack = %d, want %d", dist.ReplicationConfig.MinNodesPerRack, tt.expectedMinNodesPerRack)
}
if dist.TargetShardsPerDC != tt.expectedTargetPerDC {
t.Errorf("TargetShardsPerDC = %d, want %d", dist.TargetShardsPerDC, tt.expectedTargetPerDC)
}
if dist.TargetShardsPerRack != tt.expectedTargetPerRack {
t.Errorf("TargetShardsPerRack = %d, want %d", dist.TargetShardsPerRack, tt.expectedTargetPerRack)
}
if dist.TargetShardsPerNode != tt.expectedTargetPerNode {
t.Errorf("TargetShardsPerNode = %d, want %d", dist.TargetShardsPerNode, tt.expectedTargetPerNode)
}
t.Logf("Distribution for %s: %s", tt.name, dist.String())
})
}
}
func TestFaultToleranceAnalysis(t *testing.T) {
tests := []struct {
name string
ecConfig ECConfig
replication string
canSurviveDC bool
canSurviveRack bool
}{
// 10+4 = 14 shards, need 10 to reconstruct, can lose 4
{"10+4 000", DefaultECConfig(), "000", false, false}, // All in one, any failure is fatal
{"10+4 100", DefaultECConfig(), "100", false, false}, // 7 per DC/rack, 7 remaining < 10
{"10+4 200", DefaultECConfig(), "200", false, false}, // 5 per DC/rack, 9 remaining < 10
{"10+4 110", DefaultECConfig(), "110", false, true}, // 4 per rack, 10 remaining = enough for rack
// 8+4 = 12 shards, need 8 to reconstruct, can lose 4
{"8+4 100", ECConfig{8, 4}, "100", false, false}, // 6 per DC/rack, 6 remaining < 8
{"8+4 200", ECConfig{8, 4}, "200", true, true}, // 4 per DC/rack, 8 remaining = enough!
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rep, _ := NewReplicationConfigFromString(tt.replication)
dist := CalculateDistribution(tt.ecConfig, rep)
if dist.CanSurviveDCFailure() != tt.canSurviveDC {
t.Errorf("CanSurviveDCFailure() = %v, want %v", dist.CanSurviveDCFailure(), tt.canSurviveDC)
}
if dist.CanSurviveRackFailure() != tt.canSurviveRack {
t.Errorf("CanSurviveRackFailure() = %v, want %v", dist.CanSurviveRackFailure(), tt.canSurviveRack)
}
t.Log(dist.FaultToleranceAnalysis())
})
}
}
func TestMinDCsForDCFaultTolerance(t *testing.T) {
tests := []struct {
name string
ecConfig ECConfig
minDCs int
}{
// 10+4: can lose 4, so max 4 per DC, 14/4 = 4 DCs needed
{"10+4", DefaultECConfig(), 4},
// 8+4: can lose 4, so max 4 per DC, 12/4 = 3 DCs needed
{"8+4", ECConfig{8, 4}, 3},
// 6+3: can lose 3, so max 3 per DC, 9/3 = 3 DCs needed
{"6+3", ECConfig{6, 3}, 3},
// 4+2: can lose 2, so max 2 per DC, 6/2 = 3 DCs needed
{"4+2", ECConfig{4, 2}, 3},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rep, _ := NewReplicationConfigFromString("000")
dist := CalculateDistribution(tt.ecConfig, rep)
if dist.MinDCsForDCFaultTolerance() != tt.minDCs {
t.Errorf("MinDCsForDCFaultTolerance() = %d, want %d",
dist.MinDCsForDCFaultTolerance(), tt.minDCs)
}
t.Logf("%s: needs %d DCs for DC fault tolerance", tt.name, dist.MinDCsForDCFaultTolerance())
})
}
}
func TestTopologyAnalysis(t *testing.T) {
analysis := NewTopologyAnalysis()
// Add nodes to topology
node1 := &TopologyNode{
NodeID: "node1",
DataCenter: "dc1",
Rack: "rack1",
FreeSlots: 5,
}
node2 := &TopologyNode{
NodeID: "node2",
DataCenter: "dc1",
Rack: "rack2",
FreeSlots: 10,
}
node3 := &TopologyNode{
NodeID: "node3",
DataCenter: "dc2",
Rack: "rack3",
FreeSlots: 10,
}
analysis.AddNode(node1)
analysis.AddNode(node2)
analysis.AddNode(node3)
// Add shard locations (all on node1)
for i := 0; i < 14; i++ {
analysis.AddShardLocation(ShardLocation{
ShardID: i,
NodeID: "node1",
DataCenter: "dc1",
Rack: "rack1",
})
}
analysis.Finalize()
// Verify counts
if analysis.TotalShards != 14 {
t.Errorf("TotalShards = %d, want 14", analysis.TotalShards)
}
if analysis.ShardsByDC["dc1"] != 14 {
t.Errorf("ShardsByDC[dc1] = %d, want 14", analysis.ShardsByDC["dc1"])
}
if analysis.ShardsByRack["rack1"] != 14 {
t.Errorf("ShardsByRack[rack1] = %d, want 14", analysis.ShardsByRack["rack1"])
}
if analysis.ShardsByNode["node1"] != 14 {
t.Errorf("ShardsByNode[node1] = %d, want 14", analysis.ShardsByNode["node1"])
}
t.Log(analysis.DetailedString())
}
func TestRebalancer(t *testing.T) {
// Build topology: 2 DCs, 2 racks each, all shards on one node
analysis := NewTopologyAnalysis()
// Add nodes
nodes := []*TopologyNode{
{NodeID: "dc1-rack1-node1", DataCenter: "dc1", Rack: "dc1-rack1", FreeSlots: 0},
{NodeID: "dc1-rack2-node1", DataCenter: "dc1", Rack: "dc1-rack2", FreeSlots: 10},
{NodeID: "dc2-rack1-node1", DataCenter: "dc2", Rack: "dc2-rack1", FreeSlots: 10},
{NodeID: "dc2-rack2-node1", DataCenter: "dc2", Rack: "dc2-rack2", FreeSlots: 10},
}
for _, node := range nodes {
analysis.AddNode(node)
}
// Add all 14 shards to first node
for i := 0; i < 14; i++ {
analysis.AddShardLocation(ShardLocation{
ShardID: i,
NodeID: "dc1-rack1-node1",
DataCenter: "dc1",
Rack: "dc1-rack1",
})
}
analysis.Finalize()
// Create rebalancer with 110 replication (2 DCs, 2 racks each)
ec := DefaultECConfig()
rep, _ := NewReplicationConfigFromString("110")
rebalancer := NewRebalancer(ec, rep)
plan, err := rebalancer.PlanRebalance(analysis)
if err != nil {
t.Fatalf("PlanRebalance failed: %v", err)
}
t.Logf("Planned %d moves", plan.TotalMoves)
t.Log(plan.DetailedString())
// Verify we're moving shards to dc2
movedToDC2 := 0
for _, move := range plan.Moves {
if move.DestNode.DataCenter == "dc2" {
movedToDC2++
}
}
if movedToDC2 == 0 {
t.Error("Expected some moves to dc2")
}
// With "110" replication, target is 7 shards per DC
// Starting with 14 in dc1, should plan to move 7 to dc2
if plan.MovesAcrossDC < 7 {
t.Errorf("Expected at least 7 cross-DC moves for 110 replication, got %d", plan.MovesAcrossDC)
}
}
func TestCustomECRatios(t *testing.T) {
// Test various custom EC ratios that seaweed-enterprise might use
ratios := []struct {
name string
data int
parity int
}{
{"4+2", 4, 2},
{"6+3", 6, 3},
{"8+2", 8, 2},
{"8+4", 8, 4},
{"10+4", 10, 4},
{"12+4", 12, 4},
{"16+4", 16, 4},
}
for _, ratio := range ratios {
t.Run(ratio.name, func(t *testing.T) {
ec, err := NewECConfig(ratio.data, ratio.parity)
if err != nil {
t.Fatalf("Failed to create EC config: %v", err)
}
rep, _ := NewReplicationConfigFromString("110")
dist := CalculateDistribution(ec, rep)
t.Logf("EC %s with replication 110:", ratio.name)
t.Logf(" Total shards: %d", ec.TotalShards())
t.Logf(" Can lose: %d shards", ec.MaxTolerableLoss())
t.Logf(" Target per DC: %d", dist.TargetShardsPerDC)
t.Logf(" Target per rack: %d", dist.TargetShardsPerRack)
t.Logf(" Min DCs for DC fault tolerance: %d", dist.MinDCsForDCFaultTolerance())
// Verify basic sanity
if dist.TargetShardsPerDC*2 < ec.TotalShards() {
t.Errorf("Target per DC (%d) * 2 should be >= total (%d)",
dist.TargetShardsPerDC, ec.TotalShards())
}
})
}
}
func TestShardClassification(t *testing.T) {
ec := DefaultECConfig() // 10+4
// Test IsDataShard
for i := 0; i < 10; i++ {
if !ec.IsDataShard(i) {
t.Errorf("Shard %d should be a data shard", i)
}
if ec.IsParityShard(i) {
t.Errorf("Shard %d should not be a parity shard", i)
}
}
// Test IsParityShard
for i := 10; i < 14; i++ {
if ec.IsDataShard(i) {
t.Errorf("Shard %d should not be a data shard", i)
}
if !ec.IsParityShard(i) {
t.Errorf("Shard %d should be a parity shard", i)
}
}
// Test with custom 8+4 EC
ec84, _ := NewECConfig(8, 4)
for i := 0; i < 8; i++ {
if !ec84.IsDataShard(i) {
t.Errorf("8+4 EC: Shard %d should be a data shard", i)
}
}
for i := 8; i < 12; i++ {
if !ec84.IsParityShard(i) {
t.Errorf("8+4 EC: Shard %d should be a parity shard", i)
}
}
}
func TestSortShardsDataFirst(t *testing.T) {
ec := DefaultECConfig() // 10+4
// Mixed shards: [0, 10, 5, 11, 2, 12, 7, 13]
shards := []int{0, 10, 5, 11, 2, 12, 7, 13}
sorted := ec.SortShardsDataFirst(shards)
t.Logf("Original: %v", shards)
t.Logf("Sorted (data first): %v", sorted)
// First 4 should be data shards (0, 5, 2, 7)
for i := 0; i < 4; i++ {
if !ec.IsDataShard(sorted[i]) {
t.Errorf("Position %d should be a data shard, got %d", i, sorted[i])
}
}
// Last 4 should be parity shards (10, 11, 12, 13)
for i := 4; i < 8; i++ {
if !ec.IsParityShard(sorted[i]) {
t.Errorf("Position %d should be a parity shard, got %d", i, sorted[i])
}
}
}
func TestSortShardsParityFirst(t *testing.T) {
ec := DefaultECConfig() // 10+4
// Mixed shards: [0, 10, 5, 11, 2, 12, 7, 13]
shards := []int{0, 10, 5, 11, 2, 12, 7, 13}
sorted := ec.SortShardsParityFirst(shards)
t.Logf("Original: %v", shards)
t.Logf("Sorted (parity first): %v", sorted)
// First 4 should be parity shards (10, 11, 12, 13)
for i := 0; i < 4; i++ {
if !ec.IsParityShard(sorted[i]) {
t.Errorf("Position %d should be a parity shard, got %d", i, sorted[i])
}
}
// Last 4 should be data shards (0, 5, 2, 7)
for i := 4; i < 8; i++ {
if !ec.IsDataShard(sorted[i]) {
t.Errorf("Position %d should be a data shard, got %d", i, sorted[i])
}
}
}
func TestRebalancerPrefersMovingParityShards(t *testing.T) {
// Build topology where one node has all shards including mix of data and parity
analysis := NewTopologyAnalysis()
// Node 1: Has all 14 shards (mixed data and parity)
node1 := &TopologyNode{
NodeID: "node1",
DataCenter: "dc1",
Rack: "rack1",
FreeSlots: 0,
}
analysis.AddNode(node1)
// Node 2: Empty, ready to receive
node2 := &TopologyNode{
NodeID: "node2",
DataCenter: "dc1",
Rack: "rack1",
FreeSlots: 10,
}
analysis.AddNode(node2)
// Add all 14 shards to node1
for i := 0; i < 14; i++ {
analysis.AddShardLocation(ShardLocation{
ShardID: i,
NodeID: "node1",
DataCenter: "dc1",
Rack: "rack1",
})
}
analysis.Finalize()
// Create rebalancer
ec := DefaultECConfig()
rep, _ := NewReplicationConfigFromString("000")
rebalancer := NewRebalancer(ec, rep)
plan, err := rebalancer.PlanRebalance(analysis)
if err != nil {
t.Fatalf("PlanRebalance failed: %v", err)
}
t.Logf("Planned %d moves", len(plan.Moves))
// Check that parity shards are moved first
parityMovesFirst := 0
dataMovesFirst := 0
seenDataMove := false
for _, move := range plan.Moves {
isParity := ec.IsParityShard(move.ShardID)
t.Logf("Move shard %d (parity=%v): %s -> %s",
move.ShardID, isParity, move.SourceNode.NodeID, move.DestNode.NodeID)
if isParity && !seenDataMove {
parityMovesFirst++
} else if !isParity {
seenDataMove = true
dataMovesFirst++
}
}
t.Logf("Parity moves before first data move: %d", parityMovesFirst)
t.Logf("Data moves: %d", dataMovesFirst)
// With 10+4 EC, there are 4 parity shards
// They should be moved before data shards when possible
if parityMovesFirst < 4 && len(plan.Moves) >= 4 {
t.Logf("Note: Expected parity shards to be moved first, but got %d parity moves before data moves", parityMovesFirst)
}
}
func TestDistributionSummary(t *testing.T) {
ec := DefaultECConfig()
rep, _ := NewReplicationConfigFromString("110")
dist := CalculateDistribution(ec, rep)
summary := dist.Summary()
t.Log(summary)
if len(summary) == 0 {
t.Error("Summary should not be empty")
}
analysis := dist.FaultToleranceAnalysis()
t.Log(analysis)
if len(analysis) == 0 {
t.Error("Fault tolerance analysis should not be empty")
}
}

378
weed/storage/erasure_coding/distribution/rebalancer.go

@ -0,0 +1,378 @@
package distribution
import (
"fmt"
"slices"
)
// ShardMove represents a planned shard move
type ShardMove struct {
ShardID int
SourceNode *TopologyNode
DestNode *TopologyNode
Reason string
}
// String returns a human-readable description of the move
func (m ShardMove) String() string {
return fmt.Sprintf("shard %d: %s -> %s (%s)",
m.ShardID, m.SourceNode.NodeID, m.DestNode.NodeID, m.Reason)
}
// RebalancePlan contains the complete plan for rebalancing EC shards
type RebalancePlan struct {
Moves []ShardMove
Distribution *ECDistribution
Analysis *TopologyAnalysis
// Statistics
TotalMoves int
MovesAcrossDC int
MovesAcrossRack int
MovesWithinRack int
}
// String returns a summary of the plan
func (p *RebalancePlan) String() string {
return fmt.Sprintf("RebalancePlan{moves:%d, acrossDC:%d, acrossRack:%d, withinRack:%d}",
p.TotalMoves, p.MovesAcrossDC, p.MovesAcrossRack, p.MovesWithinRack)
}
// DetailedString returns a detailed multi-line summary
func (p *RebalancePlan) DetailedString() string {
s := fmt.Sprintf("Rebalance Plan:\n")
s += fmt.Sprintf(" Total Moves: %d\n", p.TotalMoves)
s += fmt.Sprintf(" Across DC: %d\n", p.MovesAcrossDC)
s += fmt.Sprintf(" Across Rack: %d\n", p.MovesAcrossRack)
s += fmt.Sprintf(" Within Rack: %d\n", p.MovesWithinRack)
s += fmt.Sprintf("\nMoves:\n")
for i, move := range p.Moves {
s += fmt.Sprintf(" %d. %s\n", i+1, move.String())
}
return s
}
// Rebalancer plans shard moves to achieve proportional distribution
type Rebalancer struct {
ecConfig ECConfig
repConfig ReplicationConfig
}
// NewRebalancer creates a new rebalancer with the given configuration
func NewRebalancer(ec ECConfig, rep ReplicationConfig) *Rebalancer {
return &Rebalancer{
ecConfig: ec,
repConfig: rep,
}
}
// PlanRebalance creates a rebalancing plan based on current topology analysis
func (r *Rebalancer) PlanRebalance(analysis *TopologyAnalysis) (*RebalancePlan, error) {
dist := CalculateDistribution(r.ecConfig, r.repConfig)
plan := &RebalancePlan{
Distribution: dist,
Analysis: analysis,
}
// Step 1: Balance across data centers
dcMoves := r.planDCMoves(analysis, dist)
for _, move := range dcMoves {
plan.Moves = append(plan.Moves, move)
plan.MovesAcrossDC++
}
// Update analysis after DC moves (for planning purposes)
r.applyMovesToAnalysis(analysis, dcMoves)
// Step 2: Balance across racks within each DC
rackMoves := r.planRackMoves(analysis, dist)
for _, move := range rackMoves {
plan.Moves = append(plan.Moves, move)
plan.MovesAcrossRack++
}
// Update analysis after rack moves
r.applyMovesToAnalysis(analysis, rackMoves)
// Step 3: Balance across nodes within each rack
nodeMoves := r.planNodeMoves(analysis, dist)
for _, move := range nodeMoves {
plan.Moves = append(plan.Moves, move)
plan.MovesWithinRack++
}
plan.TotalMoves = len(plan.Moves)
return plan, nil
}
// planDCMoves plans moves to balance shards across data centers
func (r *Rebalancer) planDCMoves(analysis *TopologyAnalysis, dist *ECDistribution) []ShardMove {
var moves []ShardMove
overDCs := CalculateDCExcess(analysis, dist)
underDCs := CalculateUnderservedDCs(analysis, dist)
underIdx := 0
for _, over := range overDCs {
for over.Excess > 0 && underIdx < len(underDCs) {
destDC := underDCs[underIdx]
// Find a shard and source node
shardID, srcNode := r.pickShardToMove(analysis, over.Nodes)
if srcNode == nil {
break
}
// Find destination node in target DC
destNode := r.pickBestDestination(analysis, destDC, "", dist)
if destNode == nil {
underIdx++
continue
}
moves = append(moves, ShardMove{
ShardID: shardID,
SourceNode: srcNode,
DestNode: destNode,
Reason: fmt.Sprintf("balance DC: %s -> %s", srcNode.DataCenter, destDC),
})
over.Excess--
analysis.ShardsByDC[srcNode.DataCenter]--
analysis.ShardsByDC[destDC]++
// Check if destDC reached target
if analysis.ShardsByDC[destDC] >= dist.TargetShardsPerDC {
underIdx++
}
}
}
return moves
}
// planRackMoves plans moves to balance shards across racks within each DC
func (r *Rebalancer) planRackMoves(analysis *TopologyAnalysis, dist *ECDistribution) []ShardMove {
var moves []ShardMove
for dc := range analysis.DCToRacks {
dcShards := analysis.ShardsByDC[dc]
numRacks := len(analysis.DCToRacks[dc])
if numRacks == 0 {
continue
}
targetPerRack := ceilDivide(dcShards, max(numRacks, dist.ReplicationConfig.MinRacksPerDC))
overRacks := CalculateRackExcess(analysis, dc, targetPerRack)
underRacks := CalculateUnderservedRacks(analysis, dc, targetPerRack)
underIdx := 0
for _, over := range overRacks {
for over.Excess > 0 && underIdx < len(underRacks) {
destRack := underRacks[underIdx]
// Find shard and source node
shardID, srcNode := r.pickShardToMove(analysis, over.Nodes)
if srcNode == nil {
break
}
// Find destination node in target rack
destNode := r.pickBestDestination(analysis, dc, destRack, dist)
if destNode == nil {
underIdx++
continue
}
moves = append(moves, ShardMove{
ShardID: shardID,
SourceNode: srcNode,
DestNode: destNode,
Reason: fmt.Sprintf("balance rack: %s -> %s", srcNode.Rack, destRack),
})
over.Excess--
analysis.ShardsByRack[srcNode.Rack]--
analysis.ShardsByRack[destRack]++
if analysis.ShardsByRack[destRack] >= targetPerRack {
underIdx++
}
}
}
}
return moves
}
// planNodeMoves plans moves to balance shards across nodes within each rack
func (r *Rebalancer) planNodeMoves(analysis *TopologyAnalysis, dist *ECDistribution) []ShardMove {
var moves []ShardMove
for rack, nodes := range analysis.RackToNodes {
if len(nodes) <= 1 {
continue
}
rackShards := analysis.ShardsByRack[rack]
targetPerNode := ceilDivide(rackShards, max(len(nodes), dist.ReplicationConfig.MinNodesPerRack))
// Find over and under nodes
var overNodes []*TopologyNode
var underNodes []*TopologyNode
for _, node := range nodes {
count := analysis.ShardsByNode[node.NodeID]
if count > targetPerNode {
overNodes = append(overNodes, node)
} else if count < targetPerNode {
underNodes = append(underNodes, node)
}
}
// Sort by excess/deficit
slices.SortFunc(overNodes, func(a, b *TopologyNode) int {
return analysis.ShardsByNode[b.NodeID] - analysis.ShardsByNode[a.NodeID]
})
underIdx := 0
for _, srcNode := range overNodes {
excess := analysis.ShardsByNode[srcNode.NodeID] - targetPerNode
for excess > 0 && underIdx < len(underNodes) {
destNode := underNodes[underIdx]
// Pick a shard from this node, preferring parity shards
shards := analysis.NodeToShards[srcNode.NodeID]
if len(shards) == 0 {
break
}
// Find a parity shard first, fallback to data shard
shardID := -1
shardIdx := -1
for i, s := range shards {
if r.ecConfig.IsParityShard(s) {
shardID = s
shardIdx = i
break
}
}
if shardID == -1 {
shardID = shards[0]
shardIdx = 0
}
moves = append(moves, ShardMove{
ShardID: shardID,
SourceNode: srcNode,
DestNode: destNode,
Reason: fmt.Sprintf("balance node: %s -> %s", srcNode.NodeID, destNode.NodeID),
})
excess--
analysis.ShardsByNode[srcNode.NodeID]--
analysis.ShardsByNode[destNode.NodeID]++
// Update shard lists - remove the specific shard we picked
analysis.NodeToShards[srcNode.NodeID] = append(
shards[:shardIdx], shards[shardIdx+1:]...)
analysis.NodeToShards[destNode.NodeID] = append(
analysis.NodeToShards[destNode.NodeID], shardID)
if analysis.ShardsByNode[destNode.NodeID] >= targetPerNode {
underIdx++
}
}
}
}
return moves
}
// pickShardToMove selects a shard and its node from the given nodes.
// It prefers to move parity shards first, keeping data shards spread out
// since data shards serve read requests while parity shards are only for reconstruction.
func (r *Rebalancer) pickShardToMove(analysis *TopologyAnalysis, nodes []*TopologyNode) (int, *TopologyNode) {
// Sort by shard count (most shards first)
slices.SortFunc(nodes, func(a, b *TopologyNode) int {
return analysis.ShardsByNode[b.NodeID] - analysis.ShardsByNode[a.NodeID]
})
// First pass: try to find a parity shard to move (prefer moving parity)
for _, node := range nodes {
shards := analysis.NodeToShards[node.NodeID]
for _, shardID := range shards {
if r.ecConfig.IsParityShard(shardID) {
return shardID, node
}
}
}
// Second pass: if no parity shards, move a data shard
for _, node := range nodes {
shards := analysis.NodeToShards[node.NodeID]
if len(shards) > 0 {
return shards[0], node
}
}
return -1, nil
}
// pickBestDestination selects the best destination node
func (r *Rebalancer) pickBestDestination(analysis *TopologyAnalysis, targetDC, targetRack string, dist *ECDistribution) *TopologyNode {
var candidates []*TopologyNode
// Collect candidates
for _, node := range analysis.AllNodes {
// Filter by DC if specified
if targetDC != "" && node.DataCenter != targetDC {
continue
}
// Filter by rack if specified
if targetRack != "" && node.Rack != targetRack {
continue
}
// Check capacity
if node.FreeSlots <= 0 {
continue
}
// Check max shards limit
if analysis.ShardsByNode[node.NodeID] >= dist.MaxShardsPerNode {
continue
}
candidates = append(candidates, node)
}
if len(candidates) == 0 {
return nil
}
// Sort by: 1) fewer shards, 2) more free slots
slices.SortFunc(candidates, func(a, b *TopologyNode) int {
aShards := analysis.ShardsByNode[a.NodeID]
bShards := analysis.ShardsByNode[b.NodeID]
if aShards != bShards {
return aShards - bShards
}
return b.FreeSlots - a.FreeSlots
})
return candidates[0]
}
// applyMovesToAnalysis is a no-op placeholder for potential future use.
// Note: All planners (planDCMoves, planRackMoves, planNodeMoves) update
// their respective counts (ShardsByDC, ShardsByRack, ShardsByNode) and
// shard lists (NodeToShards) inline during planning. This avoids duplicate
// updates that would occur if we also updated counts here.
func (r *Rebalancer) applyMovesToAnalysis(analysis *TopologyAnalysis, moves []ShardMove) {
// Counts are already updated by the individual planners.
// This function is kept for API compatibility and potential future use.
}
Loading…
Cancel
Save