Browse Source

Merge branch 'master' of https://github.com/seaweedfs/seaweedfs

pull/6065/merge
Chris Lu 3 days ago
parent
commit
a5474c30d1
  1. 29
      weed/storage/store_ec.go
  2. 422
      weed/storage/store_ec_recovery_test.go

29
weed/storage/store_ec.go

@ -390,9 +390,32 @@ func (s *Store) recoverOneRemoteEcShardInterval(needleId types.NeedleId, ecVolum
wg.Wait()
if err = enc.ReconstructData(bufs); err != nil {
glog.V(3).Infof("recovered ec shard %d.%d failed: %v", ecVolume.VolumeId, shardIdToRecover, err)
return 0, false, err
// Count and log available shards for diagnostics
availableShards := make([]erasure_coding.ShardId, 0, erasure_coding.TotalShardsCount)
missingShards := make([]erasure_coding.ShardId, 0, erasure_coding.ParityShardsCount+1)
for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
if bufs[shardId] != nil {
availableShards = append(availableShards, erasure_coding.ShardId(shardId))
} else {
missingShards = append(missingShards, erasure_coding.ShardId(shardId))
}
}
glog.V(3).Infof("recover ec shard %d.%d: %d shards available %v, %d missing %v",
ecVolume.VolumeId, shardIdToRecover,
len(availableShards), availableShards,
len(missingShards), missingShards)
if len(availableShards) < erasure_coding.DataShardsCount {
return 0, false, fmt.Errorf("cannot recover shard %d.%d: only %d shards available %v, need at least %d (missing: %v)",
ecVolume.VolumeId, shardIdToRecover,
len(availableShards), availableShards,
erasure_coding.DataShardsCount, missingShards)
}
if err = enc.ReconstructData(bufs[:erasure_coding.TotalShardsCount]); err != nil {
return 0, false, fmt.Errorf("failed to reconstruct data for shard %d.%d with %d available shards %v: %w",
ecVolume.VolumeId, shardIdToRecover, len(availableShards), availableShards, err)
}
glog.V(4).Infof("recovered ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)

422
weed/storage/store_ec_recovery_test.go

@ -0,0 +1,422 @@
package storage
import (
"fmt"
"strings"
"sync"
"testing"
"github.com/klauspost/reedsolomon"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
// mockEcVolume creates a mock EC volume for testing
func mockEcVolume(volumeId needle.VolumeId, shardLocations map[erasure_coding.ShardId][]pb.ServerAddress) *erasure_coding.EcVolume {
ecVolume := &erasure_coding.EcVolume{
VolumeId: volumeId,
ShardLocations: shardLocations,
}
return ecVolume
}
// TestRecoverOneRemoteEcShardInterval_SufficientShards tests successful recovery with enough shards
func TestRecoverOneRemoteEcShardInterval_SufficientShards(t *testing.T) {
// This test simulates the improved diagnostics when there are sufficient shards
// We can't easily test the full recovery without mocking the network calls,
// but we can validate the logic for counting available shards
shardIdToRecover := erasure_coding.ShardId(5)
// Create shard locations with all shards except the one to recover
shardLocations := make(map[erasure_coding.ShardId][]pb.ServerAddress)
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
if i != int(shardIdToRecover) {
shardLocations[erasure_coding.ShardId(i)] = []pb.ServerAddress{"localhost:8080"}
}
}
// Verify we have enough shards for recovery
availableCount := 0
for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
if shardId != int(shardIdToRecover) && len(shardLocations[erasure_coding.ShardId(shardId)]) > 0 {
availableCount++
}
}
if availableCount < erasure_coding.DataShardsCount {
t.Errorf("Expected at least %d shards, got %d", erasure_coding.DataShardsCount, availableCount)
}
t.Logf("Successfully identified %d available shards (need %d)", availableCount, erasure_coding.DataShardsCount)
}
// TestRecoverOneRemoteEcShardInterval_InsufficientShards tests recovery failure with too few shards
func TestRecoverOneRemoteEcShardInterval_InsufficientShards(t *testing.T) {
shardIdToRecover := erasure_coding.ShardId(5)
// Create shard locations with only 8 shards (less than DataShardsCount=10)
shardLocations := make(map[erasure_coding.ShardId][]pb.ServerAddress)
for i := 0; i < 8; i++ {
if i != int(shardIdToRecover) {
shardLocations[erasure_coding.ShardId(i)] = []pb.ServerAddress{"localhost:8080"}
}
}
// Count available shards
availableCount := 0
for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
if len(shardLocations[erasure_coding.ShardId(shardId)]) > 0 {
availableCount++
}
}
// Verify we don't have enough shards
if availableCount >= erasure_coding.DataShardsCount {
t.Errorf("Test setup error: expected less than %d shards, got %d", erasure_coding.DataShardsCount, availableCount)
}
t.Logf("Correctly identified insufficient shards: %d available (need %d)", availableCount, erasure_coding.DataShardsCount)
}
// TestRecoverOneRemoteEcShardInterval_ShardCounting tests the shard counting logic
func TestRecoverOneRemoteEcShardInterval_ShardCounting(t *testing.T) {
tests := []struct {
name string
totalShards int
shardToRecover int
expectSufficientFor bool
}{
{
name: "All shards available except one",
totalShards: erasure_coding.TotalShardsCount - 1,
shardToRecover: 5,
expectSufficientFor: true,
},
{
name: "Exactly minimum shards (DataShardsCount)",
totalShards: erasure_coding.DataShardsCount,
shardToRecover: 13,
expectSufficientFor: true,
},
{
name: "One less than minimum",
totalShards: erasure_coding.DataShardsCount - 1,
shardToRecover: 10,
expectSufficientFor: false,
},
{
name: "Only half the shards",
totalShards: erasure_coding.TotalShardsCount / 2,
shardToRecover: 0,
expectSufficientFor: false,
},
{
name: "All data shards available",
totalShards: erasure_coding.DataShardsCount,
shardToRecover: 11, // Recovering a parity shard
expectSufficientFor: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Simulate the bufs array that would be populated
bufs := make([][]byte, erasure_coding.MaxShardCount)
// Fill in available shards (excluding the one to recover)
shardCount := 0
for i := 0; i < erasure_coding.TotalShardsCount && shardCount < tt.totalShards; i++ {
if i != tt.shardToRecover {
bufs[i] = make([]byte, 1024) // dummy data
shardCount++
}
}
// Count available and missing shards (mimicking the corrected code)
availableShards := make([]erasure_coding.ShardId, 0, erasure_coding.TotalShardsCount)
missingShards := make([]erasure_coding.ShardId, 0, erasure_coding.ParityShardsCount+1)
for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
if bufs[shardId] != nil {
availableShards = append(availableShards, erasure_coding.ShardId(shardId))
} else {
missingShards = append(missingShards, erasure_coding.ShardId(shardId))
}
}
// Verify the count matches expectations
hasSufficient := len(availableShards) >= erasure_coding.DataShardsCount
if hasSufficient != tt.expectSufficientFor {
t.Errorf("Expected sufficient=%v, got sufficient=%v (available=%d, need=%d)",
tt.expectSufficientFor, hasSufficient, len(availableShards), erasure_coding.DataShardsCount)
}
t.Logf("Available shards: %d %v, Missing shards: %d %v",
len(availableShards), availableShards,
len(missingShards), missingShards)
})
}
}
// TestRecoverOneRemoteEcShardInterval_ErrorMessage tests the improved error messages
func TestRecoverOneRemoteEcShardInterval_ErrorMessage(t *testing.T) {
volumeId := needle.VolumeId(42)
shardIdToRecover := erasure_coding.ShardId(7)
// Simulate insufficient shards scenario
availableShards := []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6}
missingShards := []erasure_coding.ShardId{7, 8, 9, 10, 11, 12, 13}
// Verify error message contains all required information
expectedErr := fmt.Errorf("cannot recover shard %d.%d: only %d shards available %v, need at least %d (missing: %v)",
volumeId, shardIdToRecover,
len(availableShards), availableShards,
erasure_coding.DataShardsCount, missingShards)
errMsg := expectedErr.Error()
// Check that error message contains key information
if !strings.Contains(errMsg, fmt.Sprintf("shard %d.%d", volumeId, shardIdToRecover)) {
t.Errorf("Error message missing volume.shard identifier")
}
if !strings.Contains(errMsg, fmt.Sprintf("%d shards available", len(availableShards))) {
t.Errorf("Error message missing available shard count")
}
if !strings.Contains(errMsg, fmt.Sprintf("need at least %d", erasure_coding.DataShardsCount)) {
t.Errorf("Error message missing required shard count")
}
t.Logf("Error message format validated: %s", errMsg)
}
// TestRecoverOneRemoteEcShardInterval_ReconstructDataSlicing tests the buffer slicing fix
func TestRecoverOneRemoteEcShardInterval_ReconstructDataSlicing(t *testing.T) {
// This test validates that we pass bufs[:TotalShardsCount] to ReconstructData
// instead of the full bufs array which could be MaxShardCount (32)
enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
if err != nil {
t.Fatalf("Failed to create encoder: %v", err)
}
// Create test data
shardSize := 1024
bufs := make([][]byte, erasure_coding.MaxShardCount)
// Fill data shards
for i := 0; i < erasure_coding.DataShardsCount; i++ {
bufs[i] = make([]byte, shardSize)
for j := range bufs[i] {
bufs[i][j] = byte(i + j)
}
}
// Create parity shards (initially nil)
for i := erasure_coding.DataShardsCount; i < erasure_coding.TotalShardsCount; i++ {
bufs[i] = make([]byte, shardSize)
}
// Encode to generate parity
if err := enc.Encode(bufs[:erasure_coding.TotalShardsCount]); err != nil {
t.Fatalf("Failed to encode: %v", err)
}
// Simulate loss of shard 5
originalShard5 := make([]byte, shardSize)
copy(originalShard5, bufs[5])
bufs[5] = nil
// Reconstruct using only TotalShardsCount elements (not MaxShardCount)
if err := enc.ReconstructData(bufs[:erasure_coding.TotalShardsCount]); err != nil {
t.Fatalf("Failed to reconstruct data: %v", err)
}
// Verify shard 5 was recovered correctly
if bufs[5] == nil {
t.Errorf("Shard 5 was not recovered")
} else {
for i := range originalShard5 {
if originalShard5[i] != bufs[5][i] {
t.Errorf("Recovered shard 5 data mismatch at byte %d: expected %d, got %d",
i, originalShard5[i], bufs[5][i])
break
}
}
}
t.Logf("Successfully reconstructed shard with proper buffer slicing")
}
// TestRecoverOneRemoteEcShardInterval_ParityShardRecovery tests recovering parity shards
func TestRecoverOneRemoteEcShardInterval_ParityShardRecovery(t *testing.T) {
// Parity shards (10-13) should be recoverable with all data shards (0-9)
enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
if err != nil {
t.Fatalf("Failed to create encoder: %v", err)
}
shardSize := 512
bufs := make([][]byte, erasure_coding.TotalShardsCount)
// Fill all shards initially
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
bufs[i] = make([]byte, shardSize)
for j := range bufs[i] {
bufs[i][j] = byte(i * j)
}
}
// Encode
if err := enc.Encode(bufs); err != nil {
t.Fatalf("Failed to encode: %v", err)
}
// Test recovering each parity shard
for parityShard := erasure_coding.DataShardsCount; parityShard < erasure_coding.TotalShardsCount; parityShard++ {
t.Run(fmt.Sprintf("RecoverParity%d", parityShard), func(t *testing.T) {
testBufs := make([][]byte, erasure_coding.TotalShardsCount)
for i := range testBufs {
if i != parityShard {
testBufs[i] = make([]byte, shardSize)
copy(testBufs[i], bufs[i])
}
}
// Reconstruct (handles both data and parity)
if err := enc.Reconstruct(testBufs); err != nil {
t.Errorf("Failed to reconstruct parity shard %d: %v", parityShard, err)
}
// Verify
if testBufs[parityShard] == nil {
t.Errorf("Parity shard %d was not recovered", parityShard)
}
})
}
}
// TestRecoverOneRemoteEcShardInterval_ConcurrentShardReading tests the concurrent shard reading
func TestRecoverOneRemoteEcShardInterval_ConcurrentShardReading(t *testing.T) {
// Simulate the concurrent reading pattern in recoverOneRemoteEcShardInterval
shardIdToRecover := erasure_coding.ShardId(7)
shardLocations := make(map[erasure_coding.ShardId][]pb.ServerAddress)
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
if i != int(shardIdToRecover) {
shardLocations[erasure_coding.ShardId(i)] = []pb.ServerAddress{"server1:8080"}
}
}
// Simulate concurrent shard reading
bufs := make([][]byte, erasure_coding.MaxShardCount)
var wg sync.WaitGroup
var mu sync.Mutex
readErrors := make(map[erasure_coding.ShardId]error)
for shardId, locations := range shardLocations {
if shardId == shardIdToRecover {
continue
}
if len(locations) == 0 {
continue
}
wg.Add(1)
go func(sid erasure_coding.ShardId) {
defer wg.Done()
// Simulate successful read
data := make([]byte, 1024)
for i := range data {
data[i] = byte(sid)
}
mu.Lock()
bufs[sid] = data
mu.Unlock()
}(shardId)
}
wg.Wait()
// Count available shards
availableCount := 0
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
if bufs[i] != nil {
availableCount++
}
}
expectedCount := erasure_coding.TotalShardsCount - 1 // All except the one to recover
if availableCount != expectedCount {
t.Errorf("Expected %d shards to be read, got %d", expectedCount, availableCount)
}
// Verify no errors occurred
if len(readErrors) > 0 {
t.Errorf("Unexpected read errors: %v", readErrors)
}
t.Logf("Successfully simulated concurrent reading of %d shards", availableCount)
}
// TestRecoverOneRemoteEcShardInterval_BuggyMaxShardCount tests the fix for the bug where
// buffers beyond TotalShardsCount were incorrectly counted as available
func TestRecoverOneRemoteEcShardInterval_BuggyMaxShardCount(t *testing.T) {
// This test would have failed with the original buggy code that iterated up to MaxShardCount
// The bug: if bufs[15..31] had non-nil values, they would be counted as "available"
// even though they should be ignored (only indices 0-13 matter for TotalShardsCount=14)
bufs := make([][]byte, erasure_coding.MaxShardCount)
// Set up only 9 valid shards (less than DataShardsCount=10)
for i := 0; i < 9; i++ {
bufs[i] = make([]byte, 1024)
}
// CRITICAL: Set garbage data in indices beyond TotalShardsCount
// The buggy code would count these, making it think we have enough shards
for i := erasure_coding.TotalShardsCount; i < erasure_coding.MaxShardCount; i++ {
bufs[i] = make([]byte, 1024) // This should be IGNORED
}
// Count using the CORRECTED logic (should only check 0..TotalShardsCount-1)
availableShards := make([]erasure_coding.ShardId, 0, erasure_coding.TotalShardsCount)
missingShards := make([]erasure_coding.ShardId, 0, erasure_coding.ParityShardsCount+1)
for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
if bufs[shardId] != nil {
availableShards = append(availableShards, erasure_coding.ShardId(shardId))
} else {
missingShards = append(missingShards, erasure_coding.ShardId(shardId))
}
}
// With corrected code: should have 9 available shards (insufficient)
if len(availableShards) != 9 {
t.Errorf("Expected 9 available shards, got %d", len(availableShards))
}
if len(availableShards) >= erasure_coding.DataShardsCount {
t.Errorf("CRITICAL BUG: Incorrectly counted buffers beyond TotalShardsCount as available!")
}
// Count using the BUGGY logic (what the old code did)
buggyAvailableCount := 0
for shardId := 0; shardId < erasure_coding.MaxShardCount; shardId++ {
if bufs[shardId] != nil {
buggyAvailableCount++
}
}
// The buggy code would have counted 9 + 18 = 27 shards (WRONG!)
if buggyAvailableCount != 27 {
t.Errorf("Expected buggy logic to count 27 shards, got %d", buggyAvailableCount)
}
t.Logf("✅ Corrected code: %d shards (correct, insufficient)", len(availableShards))
t.Logf("❌ Buggy code would have counted: %d shards (incorrect, falsely sufficient)", buggyAvailableCount)
t.Logf("Missing shards: %v", missingShards)
}
Loading…
Cancel
Save