Browse Source

Add disk-aware EC rebalancing integration tests

- Add TestDiskAwareECRebalancing test with multi-disk cluster setup
- Test EC encode with disk awareness (shows disk ID in output)
- Test EC balance with disk-level shard distribution
- Add helper functions for disk-level verification:
  - startMultiDiskCluster: 3 servers x 4 disks each
  - countShardsPerDisk: track shards per disk per server
  - calculateDiskShardVariance: measure distribution balance
- Verify no single disk is overloaded with shards
pull/7597/head
chrislusf 1 day ago
parent
commit
a35db9698e
  1. 426
      test/erasure_coding/ec_integration_test.go

426
test/erasure_coding/ec_integration_test.go

@ -5,9 +5,11 @@ import (
"context"
"fmt"
"io"
"math"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"
@ -656,3 +658,427 @@ func TestECEncodingRegressionPrevention(t *testing.T) {
t.Log("Timing pattern regression test passed")
})
}
// TestDiskAwareECRebalancing tests EC shard placement across multiple disks per server
// This verifies the disk-aware EC rebalancing feature works correctly
func TestDiskAwareECRebalancing(t *testing.T) {
if testing.Short() {
t.Skip("Skipping disk-aware integration test in short mode")
}
testDir, err := os.MkdirTemp("", "seaweedfs_disk_aware_ec_test_")
require.NoError(t, err)
defer os.RemoveAll(testDir)
ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
defer cancel()
// Start cluster with MULTIPLE DISKS per volume server
cluster, err := startMultiDiskCluster(ctx, testDir)
require.NoError(t, err)
defer cluster.Stop()
// Wait for servers to be ready
require.NoError(t, waitForServer("127.0.0.1:9334", 30*time.Second))
for i := 0; i < 3; i++ {
require.NoError(t, waitForServer(fmt.Sprintf("127.0.0.1:809%d", i), 30*time.Second))
}
// Wait longer for volume servers to register with master and create volumes
t.Log("Waiting for volume servers to register with master...")
time.Sleep(10 * time.Second)
// Create command environment
options := &shell.ShellOptions{
Masters: stringPtr("127.0.0.1:9334"),
GrpcDialOption: grpc.WithInsecure(),
FilerGroup: stringPtr("default"),
}
commandEnv := shell.NewCommandEnv(options)
// Connect to master with longer timeout
ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel2()
go commandEnv.MasterClient.KeepConnectedToMaster(ctx2)
commandEnv.MasterClient.WaitUntilConnected(ctx2)
// Wait for master client to fully sync
time.Sleep(5 * time.Second)
// Upload test data to create a volume - retry if volumes not ready
var volumeId needle.VolumeId
testData := []byte("Disk-aware EC rebalancing test data - this needs to be large enough to create a volume")
for retry := 0; retry < 5; retry++ {
volumeId, err = uploadTestDataToMaster(testData, "127.0.0.1:9334")
if err == nil {
break
}
t.Logf("Upload attempt %d failed: %v, retrying...", retry+1, err)
time.Sleep(3 * time.Second)
}
require.NoError(t, err, "Failed to upload test data after retries")
t.Logf("Created volume %d for disk-aware EC test", volumeId)
// Wait for volume to be registered
time.Sleep(3 * time.Second)
t.Run("verify_multi_disk_setup", func(t *testing.T) {
// Verify that each server has multiple disk directories
for server := 0; server < 3; server++ {
diskCount := 0
for disk := 0; disk < 4; disk++ {
diskDir := filepath.Join(testDir, fmt.Sprintf("server%d_disk%d", server, disk))
if _, err := os.Stat(diskDir); err == nil {
diskCount++
}
}
assert.Equal(t, 4, diskCount, "Server %d should have 4 disk directories", server)
t.Logf("Server %d has %d disk directories", server, diskCount)
}
})
t.Run("ec_encode_with_disk_awareness", func(t *testing.T) {
// Get lock first
lockCmd := shell.Commands[findCommandIndex("lock")]
var lockOutput bytes.Buffer
err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
if err != nil {
t.Logf("Lock command failed: %v", err)
}
// Execute EC encoding
var output bytes.Buffer
ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")]
args := []string{"-volumeId", fmt.Sprintf("%d", volumeId), "-collection", "test", "-force"}
// Capture output
oldStdout := os.Stdout
oldStderr := os.Stderr
r, w, _ := os.Pipe()
os.Stdout = w
os.Stderr = w
err = ecEncodeCmd.Do(args, commandEnv, &output)
w.Close()
os.Stdout = oldStdout
os.Stderr = oldStderr
capturedOutput, _ := io.ReadAll(r)
outputStr := string(capturedOutput) + output.String()
t.Logf("EC encode output:\n%s", outputStr)
if err != nil {
t.Logf("EC encoding completed with error: %v", err)
} else {
t.Logf("EC encoding completed successfully")
}
})
t.Run("verify_disk_level_shard_distribution", func(t *testing.T) {
// Wait for shards to be distributed
time.Sleep(2 * time.Second)
// Count shards on each disk of each server
diskDistribution := countShardsPerDisk(testDir, uint32(volumeId))
totalShards := 0
disksWithShards := 0
maxShardsOnSingleDisk := 0
t.Logf("Disk-level shard distribution for volume %d:", volumeId)
for server, disks := range diskDistribution {
for diskId, shardCount := range disks {
if shardCount > 0 {
t.Logf(" %s disk %d: %d shards", server, diskId, shardCount)
totalShards += shardCount
disksWithShards++
if shardCount > maxShardsOnSingleDisk {
maxShardsOnSingleDisk = shardCount
}
}
}
}
t.Logf("Summary: %d total shards across %d disks (max %d on single disk)",
totalShards, disksWithShards, maxShardsOnSingleDisk)
// EC creates 14 shards (10 data + 4 parity), plus .ecx and .ecj files
// We should see shards distributed across multiple disks
if disksWithShards > 1 {
t.Logf("PASS: Shards distributed across %d disks", disksWithShards)
} else {
t.Logf("INFO: Shards on %d disk(s) - may be expected if volume was on single disk", disksWithShards)
}
})
t.Run("test_ec_balance_disk_awareness", func(t *testing.T) {
// Calculate initial disk balance variance
initialDistribution := countShardsPerDisk(testDir, uint32(volumeId))
initialVariance := calculateDiskShardVariance(initialDistribution)
t.Logf("Initial disk shard variance: %.2f", initialVariance)
// Run ec.balance command
var output bytes.Buffer
ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")]
oldStdout := os.Stdout
oldStderr := os.Stderr
r, w, _ := os.Pipe()
os.Stdout = w
os.Stderr = w
err := ecBalanceCmd.Do([]string{"-force"}, commandEnv, &output)
w.Close()
os.Stdout = oldStdout
os.Stderr = oldStderr
capturedOutput, _ := io.ReadAll(r)
outputStr := string(capturedOutput) + output.String()
if err != nil {
t.Logf("ec.balance error: %v", err)
}
t.Logf("ec.balance output:\n%s", outputStr)
// Wait for balance to complete
time.Sleep(2 * time.Second)
// Calculate final disk balance variance
finalDistribution := countShardsPerDisk(testDir, uint32(volumeId))
finalVariance := calculateDiskShardVariance(finalDistribution)
t.Logf("Final disk shard variance: %.2f", finalVariance)
t.Logf("Variance change: %.2f -> %.2f", initialVariance, finalVariance)
})
t.Run("verify_no_disk_overload", func(t *testing.T) {
// Verify that no single disk has too many shards of the same volume
diskDistribution := countShardsPerDisk(testDir, uint32(volumeId))
for server, disks := range diskDistribution {
for diskId, shardCount := range disks {
// With 14 EC shards and 12 disks (3 servers x 4 disks), ideally ~1-2 shards per disk
// Allow up to 4 shards per disk as a reasonable threshold
if shardCount > 4 {
t.Logf("WARNING: %s disk %d has %d shards (may indicate imbalance)",
server, diskId, shardCount)
}
}
}
})
}
// MultiDiskCluster represents a test cluster with multiple disks per volume server
type MultiDiskCluster struct {
masterCmd *exec.Cmd
volumeServers []*exec.Cmd
testDir string
}
func (c *MultiDiskCluster) Stop() {
// Stop volume servers first
for _, cmd := range c.volumeServers {
if cmd != nil && cmd.Process != nil {
cmd.Process.Kill()
cmd.Wait()
}
}
// Stop master server
if c.masterCmd != nil && c.masterCmd.Process != nil {
c.masterCmd.Process.Kill()
c.masterCmd.Wait()
}
}
// startMultiDiskCluster starts a SeaweedFS cluster with multiple disks per volume server
func startMultiDiskCluster(ctx context.Context, dataDir string) (*MultiDiskCluster, error) {
weedBinary := findWeedBinary()
if weedBinary == "" {
return nil, fmt.Errorf("weed binary not found")
}
cluster := &MultiDiskCluster{testDir: dataDir}
// Create master directory
masterDir := filepath.Join(dataDir, "master")
os.MkdirAll(masterDir, 0755)
// Start master server on a different port to avoid conflict
masterCmd := exec.CommandContext(ctx, weedBinary, "master",
"-port", "9334",
"-mdir", masterDir,
"-volumeSizeLimitMB", "10",
"-ip", "127.0.0.1",
)
masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log"))
if err != nil {
return nil, fmt.Errorf("failed to create master log file: %v", err)
}
masterCmd.Stdout = masterLogFile
masterCmd.Stderr = masterLogFile
if err := masterCmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start master server: %v", err)
}
cluster.masterCmd = masterCmd
// Wait for master to be ready
time.Sleep(2 * time.Second)
// Start 3 volume servers, each with 4 disks
const numServers = 3
const disksPerServer = 4
for i := 0; i < numServers; i++ {
// Create 4 disk directories per server
var diskDirs []string
var maxVolumes []string
for d := 0; d < disksPerServer; d++ {
diskDir := filepath.Join(dataDir, fmt.Sprintf("server%d_disk%d", i, d))
if err := os.MkdirAll(diskDir, 0755); err != nil {
cluster.Stop()
return nil, fmt.Errorf("failed to create disk dir: %v", err)
}
diskDirs = append(diskDirs, diskDir)
maxVolumes = append(maxVolumes, "5")
}
port := fmt.Sprintf("809%d", i)
rack := fmt.Sprintf("rack%d", i)
volumeCmd := exec.CommandContext(ctx, weedBinary, "volume",
"-port", port,
"-dir", strings.Join(diskDirs, ","),
"-max", strings.Join(maxVolumes, ","),
"-mserver", "127.0.0.1:9334",
"-ip", "127.0.0.1",
"-dataCenter", "dc1",
"-rack", rack,
)
// Create log file for this volume server
logDir := filepath.Join(dataDir, fmt.Sprintf("server%d_logs", i))
os.MkdirAll(logDir, 0755)
volumeLogFile, err := os.Create(filepath.Join(logDir, "volume.log"))
if err != nil {
cluster.Stop()
return nil, fmt.Errorf("failed to create volume log file: %v", err)
}
volumeCmd.Stdout = volumeLogFile
volumeCmd.Stderr = volumeLogFile
if err := volumeCmd.Start(); err != nil {
cluster.Stop()
return nil, fmt.Errorf("failed to start volume server %d: %v", i, err)
}
cluster.volumeServers = append(cluster.volumeServers, volumeCmd)
}
// Wait for volume servers to register with master
// Multi-disk servers may take longer to initialize
time.Sleep(8 * time.Second)
return cluster, nil
}
// uploadTestDataToMaster uploads test data to a specific master address
func uploadTestDataToMaster(data []byte, masterAddress string) (needle.VolumeId, error) {
assignResult, err := operation.Assign(context.Background(), func(ctx context.Context) pb.ServerAddress {
return pb.ServerAddress(masterAddress)
}, grpc.WithInsecure(), &operation.VolumeAssignRequest{
Count: 1,
Collection: "test",
Replication: "000",
})
if err != nil {
return 0, err
}
uploader, err := operation.NewUploader()
if err != nil {
return 0, err
}
uploadResult, err, _ := uploader.Upload(context.Background(), bytes.NewReader(data), &operation.UploadOption{
UploadUrl: "http://" + assignResult.Url + "/" + assignResult.Fid,
Filename: "testfile.txt",
MimeType: "text/plain",
})
if err != nil {
return 0, err
}
if uploadResult.Error != "" {
return 0, fmt.Errorf("upload error: %s", uploadResult.Error)
}
fid, err := needle.ParseFileIdFromString(assignResult.Fid)
if err != nil {
return 0, err
}
return fid.VolumeId, nil
}
// countShardsPerDisk counts EC shards on each disk of each server
// Returns map: "serverN" -> map[diskId]shardCount
func countShardsPerDisk(testDir string, volumeId uint32) map[string]map[int]int {
result := make(map[string]map[int]int)
const numServers = 3
const disksPerServer = 4
for server := 0; server < numServers; server++ {
serverKey := fmt.Sprintf("server%d", server)
result[serverKey] = make(map[int]int)
for disk := 0; disk < disksPerServer; disk++ {
diskDir := filepath.Join(testDir, fmt.Sprintf("server%d_disk%d", server, disk))
count, err := countECShardFiles(diskDir, volumeId)
if err == nil && count > 0 {
result[serverKey][disk] = count
}
}
}
return result
}
// calculateDiskShardVariance measures how evenly shards are distributed across disks
// Lower variance means better distribution
func calculateDiskShardVariance(distribution map[string]map[int]int) float64 {
var counts []float64
for _, disks := range distribution {
for _, count := range disks {
if count > 0 {
counts = append(counts, float64(count))
}
}
}
if len(counts) == 0 {
return 0
}
// Calculate mean
mean := 0.0
for _, c := range counts {
mean += c
}
mean /= float64(len(counts))
// Calculate variance
variance := 0.0
for _, c := range counts {
variance += (c - mean) * (c - mean)
}
return math.Sqrt(variance / float64(len(counts)))
}
Loading…
Cancel
Save