You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

473 lines
17 KiB

package topology
import (
"fmt"
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/sequence"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
testAssert "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestPreUpgradeNodeCompatibility tests that pre-upgrade nodes (without generation support)
// can continue working with the new generation-aware system
func TestPreUpgradeNodeCompatibility(t *testing.T) {
t.Run("pre_upgrade_heartbeat_processing", func(t *testing.T) {
// Test that heartbeats from pre-upgrade volume servers are processed correctly
topo := NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false)
volumeId := needle.VolumeId(456)
// Simulate heartbeat from pre-upgrade volume server (generation=0)
ecShardInfo := &master_pb.VolumeEcShardInformationMessage{
Id: uint32(volumeId),
Collection: "test",
EcIndexBits: uint32(0x3FFF), // all 14 shards
DiskType: "hdd",
Generation: 0, // Pre-upgrade server sends generation 0
}
dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1")
dn := rack.GetOrCreateDataNode("server1", 8080, 0, "127.0.0.1", nil)
// Process heartbeat - should work fine
topo.SyncDataNodeEcShards([]*master_pb.VolumeEcShardInformationMessage{ecShardInfo}, dn)
// Verify it was registered
locations, found := topo.LookupEcShards(volumeId, 0)
require.True(t, found, "Pre-upgrade server EC shards should be registered")
testAssert.Equal(t, uint32(0), locations.Generation, "Should be registered as generation 0")
t.Logf("✅ Pre-upgrade server heartbeat processed: volume %d generation %d",
volumeId, locations.Generation)
})
t.Run("pre_upgrade_lookup_fallback", func(t *testing.T) {
// Test that pre-upgrade clients can lookup volumes using generation 0
topo := NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false)
volumeId := needle.VolumeId(123)
// Register generation 2 shards
ecInfo := &erasure_coding.EcVolumeInfo{
VolumeId: volumeId,
Collection: "test",
ShardBits: erasure_coding.ShardBits(0x3FFF), // all 14 shards
Generation: 2,
}
dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1")
dn := rack.GetOrCreateDataNode("server1", 8080, 0, "127.0.0.1", nil)
topo.RegisterEcShards(ecInfo, dn)
// Set generation 2 as active
topo.SetEcActiveGeneration(volumeId, 2)
// Pre-upgrade client looks up with generation 0
locations, actualGen, found := topo.LookupEcShardsWithFallback(volumeId, 0)
require.True(t, found, "Pre-upgrade client should find EC volume")
testAssert.Equal(t, uint32(2), actualGen, "Should return active generation")
testAssert.Equal(t, uint32(2), locations.Generation, "Locations should be for active generation")
t.Logf("✅ Pre-upgrade client lookup: requested gen=0, got active gen=%d", actualGen)
})
}
// TestPostUpgradeNodeCompatibility tests that post-upgrade nodes (with generation support)
// can handle legacy data from pre-upgrade nodes
func TestPostUpgradeNodeCompatibility(t *testing.T) {
t.Run("post_upgrade_handles_legacy_data", func(t *testing.T) {
// Test that new generation-aware nodes can handle legacy generation 0 data
topo := NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false)
volumeId := needle.VolumeId(789)
// Register legacy generation 0 EC volume (from pre-upgrade)
legacyEcInfo := &erasure_coding.EcVolumeInfo{
VolumeId: volumeId,
Collection: "test",
ShardBits: erasure_coding.ShardBits(0x3FFF),
Generation: 0, // Legacy data
}
dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1")
dn := rack.GetOrCreateDataNode("server1", 8080, 0, "127.0.0.1", nil)
topo.RegisterEcShards(legacyEcInfo, dn)
// Post-upgrade client with generation support looks up the volume
// When no active generation is set, should fallback to whatever is available
locations, actualGen, found := topo.LookupEcShardsWithFallback(volumeId, 0)
require.True(t, found, "Post-upgrade node should find legacy data")
testAssert.Equal(t, uint32(0), actualGen, "Should return generation 0 for legacy data")
testAssert.Equal(t, uint32(0), locations.Generation, "Locations should be generation 0")
t.Logf("✅ Post-upgrade node handles legacy data: found gen=%d", actualGen)
})
t.Run("post_upgrade_prefers_active_generation", func(t *testing.T) {
// Test that post-upgrade nodes prefer active generation over legacy
topo := NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false)
volumeId := needle.VolumeId(999)
dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1")
dn := rack.GetOrCreateDataNode("server1", 8080, 0, "127.0.0.1", nil)
// Register both legacy (gen 0) and new (gen 1) data
legacyEcInfo := &erasure_coding.EcVolumeInfo{
VolumeId: volumeId,
Collection: "test",
ShardBits: erasure_coding.ShardBits(0x3FFF),
Generation: 0,
}
topo.RegisterEcShards(legacyEcInfo, dn)
newEcInfo := &erasure_coding.EcVolumeInfo{
VolumeId: volumeId,
Collection: "test",
ShardBits: erasure_coding.ShardBits(0x3FFF),
Generation: 1,
}
topo.RegisterEcShards(newEcInfo, dn)
// Set generation 1 as active
topo.SetEcActiveGeneration(volumeId, 1)
// Post-upgrade client lookup should prefer active generation
locations, actualGen, found := topo.LookupEcShardsWithFallback(volumeId, 0)
require.True(t, found, "Should find volume")
testAssert.Equal(t, uint32(1), actualGen, "Should prefer active generation over legacy")
testAssert.Equal(t, uint32(1), locations.Generation, "Locations should be active generation")
t.Logf("✅ Post-upgrade node prefers active: legacy=0, active=1, returned=%d", actualGen)
})
t.Run("post_upgrade_strict_generation_requests", func(t *testing.T) {
// Test that post-upgrade clients can make strict generation requests
topo := NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false)
volumeId := needle.VolumeId(555)
dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1")
dn := rack.GetOrCreateDataNode("server1", 8080, 0, "127.0.0.1", nil)
// Register multiple generations
for gen := uint32(0); gen <= 2; gen++ {
ecInfo := &erasure_coding.EcVolumeInfo{
VolumeId: volumeId,
Collection: "test",
ShardBits: erasure_coding.ShardBits(0x3FFF),
Generation: gen,
}
topo.RegisterEcShards(ecInfo, dn)
}
// Test strict generation requests
for requestedGen := uint32(0); requestedGen <= 2; requestedGen++ {
locations, actualGen, found := topo.LookupEcShardsWithFallback(volumeId, requestedGen)
if requestedGen == 0 {
// Generation 0 requests use active generation logic
require.True(t, found, "Generation 0 request should find volume")
} else {
// Specific generation requests should return exact match
require.True(t, found, "Specific generation request should find exact match")
testAssert.Equal(t, requestedGen, actualGen, "Should return exact requested generation")
testAssert.Equal(t, requestedGen, locations.Generation, "Locations should match requested generation")
}
}
t.Logf("✅ Post-upgrade strict requests work for all generations")
})
}
// TestMixedClusterOperations tests operations in a mixed cluster
// where some nodes are pre-upgrade and some are post-upgrade
func TestMixedClusterOperations(t *testing.T) {
t.Run("mixed_cluster_shard_distribution", func(t *testing.T) {
// Test that EC shards can be distributed across mixed-version nodes
topo := NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false)
volumeId := needle.VolumeId(777)
dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1")
// Pre-upgrade node (sends generation 0)
preUpgradeNode := rack.GetOrCreateDataNode("pre-upgrade", 8080, 0, "127.0.0.1", nil)
// Post-upgrade node (sends specific generation)
postUpgradeNode := rack.GetOrCreateDataNode("post-upgrade", 8081, 0, "127.0.0.2", nil)
// Pre-upgrade node reports shards with generation 0
preUpgradeShards := &erasure_coding.EcVolumeInfo{
VolumeId: volumeId,
Collection: "test",
ShardBits: erasure_coding.ShardBits(0x1FF), // shards 0-8
Generation: 0,
}
topo.RegisterEcShards(preUpgradeShards, preUpgradeNode)
// Post-upgrade node reports shards with generation 1
postUpgradeShards := &erasure_coding.EcVolumeInfo{
VolumeId: volumeId,
Collection: "test",
ShardBits: erasure_coding.ShardBits(0x3E00), // shards 9-13
Generation: 1,
}
topo.RegisterEcShards(postUpgradeShards, postUpgradeNode)
// Verify both generations are registered
gen0Locations, found0 := topo.LookupEcShards(volumeId, 0)
gen1Locations, found1 := topo.LookupEcShards(volumeId, 1)
require.True(t, found0, "Generation 0 shards should be registered")
require.True(t, found1, "Generation 1 shards should be registered")
gen0ShardCount := countShards(gen0Locations)
gen1ShardCount := countShards(gen1Locations)
testAssert.Equal(t, 9, gen0ShardCount, "Pre-upgrade node should have 9 shards")
testAssert.Equal(t, 5, gen1ShardCount, "Post-upgrade node should have 5 shards")
t.Logf("✅ Mixed cluster shard distribution: gen0=%d shards, gen1=%d shards",
gen0ShardCount, gen1ShardCount)
})
}
// TestRollingUpgradeScenarios tests specific rolling upgrade scenarios
func TestRollingUpgradeScenarios(t *testing.T) {
t.Run("rolling_upgrade_sequence", func(t *testing.T) {
// Test the complete rolling upgrade sequence
topo := NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false)
volumeId := needle.VolumeId(123)
dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1")
// Create 6 nodes representing a cluster during rolling upgrade
nodes := make([]*DataNode, 6)
for i := 0; i < 6; i++ {
nodes[i] = rack.GetOrCreateDataNode(fmt.Sprintf("node%d", i), 8080+i, 0, fmt.Sprintf("127.0.0.%d", i+1), nil)
}
// Phase 1: All nodes are pre-upgrade (generation 0)
t.Run("phase1_all_pre_upgrade", func(t *testing.T) {
for i, node := range nodes {
ecInfo := &erasure_coding.EcVolumeInfo{
VolumeId: volumeId,
Collection: "test",
ShardBits: erasure_coding.ShardBits(1 << i), // Each node has one shard
Generation: 0,
}
topo.RegisterEcShards(ecInfo, node)
}
// Verify all shards are generation 0
locations, found := topo.LookupEcShards(volumeId, 0)
require.True(t, found, "Should find generation 0 volume")
testAssert.Equal(t, 6, countShards(locations), "Should have 6 shards")
t.Logf("✅ Phase 1: All 6 nodes running pre-upgrade with generation 0")
})
// Phase 2: Partially upgraded cluster (3 nodes upgraded)
t.Run("phase2_partial_upgrade", func(t *testing.T) {
// Nodes 3-5 are upgraded and now understand generations
// They re-register their shards as generation 1
for i := 3; i < 6; i++ {
// Unregister old generation 0 shard
oldEcInfo := &erasure_coding.EcVolumeInfo{
VolumeId: volumeId,
Collection: "test",
ShardBits: erasure_coding.ShardBits(1 << i),
Generation: 0,
}
topo.UnRegisterEcShards(oldEcInfo, nodes[i])
// Register new generation 1 shard
newEcInfo := &erasure_coding.EcVolumeInfo{
VolumeId: volumeId,
Collection: "test",
ShardBits: erasure_coding.ShardBits(1 << i),
Generation: 1,
}
topo.RegisterEcShards(newEcInfo, nodes[i])
}
// Verify mixed generations
gen0Locations, found0 := topo.LookupEcShards(volumeId, 0)
gen1Locations, found1 := topo.LookupEcShards(volumeId, 1)
require.True(t, found0, "Should still have generation 0 shards")
require.True(t, found1, "Should have generation 1 shards")
testAssert.Equal(t, 3, countShards(gen0Locations), "Should have 3 gen 0 shards")
testAssert.Equal(t, 3, countShards(gen1Locations), "Should have 3 gen 1 shards")
t.Logf("✅ Phase 2: Mixed cluster - 3 nodes gen 0, 3 nodes gen 1")
})
// Phase 3: Fully upgraded cluster
t.Run("phase3_full_upgrade", func(t *testing.T) {
// Remaining nodes 0-2 are upgraded
for i := 0; i < 3; i++ {
// Unregister old generation 0 shard
oldEcInfo := &erasure_coding.EcVolumeInfo{
VolumeId: volumeId,
Collection: "test",
ShardBits: erasure_coding.ShardBits(1 << i),
Generation: 0,
}
topo.UnRegisterEcShards(oldEcInfo, nodes[i])
// Register new generation 1 shard
newEcInfo := &erasure_coding.EcVolumeInfo{
VolumeId: volumeId,
Collection: "test",
ShardBits: erasure_coding.ShardBits(1 << i),
Generation: 1,
}
topo.RegisterEcShards(newEcInfo, nodes[i])
}
// Set generation 1 as active
topo.SetEcActiveGeneration(volumeId, 1)
// Verify only generation 1 remains
_, found0 := topo.LookupEcShards(volumeId, 0)
gen1Locations, found1 := topo.LookupEcShards(volumeId, 1)
testAssert.False(t, found0, "Should no longer have generation 0 shards")
require.True(t, found1, "Should have generation 1 shards")
testAssert.Equal(t, 6, countShards(gen1Locations), "Should have all 6 gen 1 shards")
// Test that lookups now prefer generation 1
_, actualGen, found := topo.LookupEcShardsWithFallback(volumeId, 0)
require.True(t, found, "Should find volume")
testAssert.Equal(t, uint32(1), actualGen, "Should return active generation 1")
t.Logf("✅ Phase 3: All nodes upgraded to generation 1, old generation cleaned up")
})
})
}
// TestGenerationCompatibilityMatrix tests all combinations of client/server generations
func TestGenerationCompatibilityMatrix(t *testing.T) {
// Test matrix of generation compatibility for various upgrade scenarios
testCases := []struct {
name string
clientType string
serverGeneration uint32
requestGeneration uint32
shouldBeCompatible bool
description string
}{
{
name: "pre_client_to_pre_server",
clientType: "pre-upgrade",
serverGeneration: 0,
requestGeneration: 0,
shouldBeCompatible: true,
description: "Pre-upgrade client to pre-upgrade server",
},
{
name: "pre_client_to_post_server_gen1",
clientType: "pre-upgrade",
serverGeneration: 1,
requestGeneration: 0,
shouldBeCompatible: true,
description: "Pre-upgrade client to generation 1 server",
},
{
name: "pre_client_to_post_server_gen2",
clientType: "pre-upgrade",
serverGeneration: 2,
requestGeneration: 0,
shouldBeCompatible: true,
description: "Pre-upgrade client to generation 2 server",
},
{
name: "post_client_exact_match",
clientType: "post-upgrade",
serverGeneration: 1,
requestGeneration: 1,
shouldBeCompatible: true,
description: "Post-upgrade client exact generation match",
},
{
name: "post_client_strict_mismatch",
clientType: "post-upgrade",
serverGeneration: 0,
requestGeneration: 1,
shouldBeCompatible: false,
description: "Post-upgrade client strict mismatch",
},
{
name: "post_client_legacy_request",
clientType: "post-upgrade",
serverGeneration: 1,
requestGeneration: 0,
shouldBeCompatible: true,
description: "Post-upgrade client with legacy request",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Use the isGenerationCompatible function from volume_grpc_erasure_coding.go
compatible := isGenerationCompatible(tc.serverGeneration, tc.requestGeneration)
testAssert.Equal(t, tc.shouldBeCompatible, compatible, tc.description)
if compatible {
t.Logf("✅ %s: server_gen=%d, request_gen=%d → COMPATIBLE",
tc.description, tc.serverGeneration, tc.requestGeneration)
} else {
t.Logf("❌ %s: server_gen=%d, request_gen=%d → INCOMPATIBLE",
tc.description, tc.serverGeneration, tc.requestGeneration)
}
})
}
}
// Helper function to count shards in EcShardLocations
func countShards(locations *EcShardLocations) int {
count := 0
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
if len(locations.Locations[i]) > 0 {
count++
}
}
return count
}
// Helper function to simulate isGenerationCompatible from volume_grpc_erasure_coding.go
func isGenerationCompatible(actualGeneration, requestedGeneration uint32) bool {
// Exact match is always compatible
if actualGeneration == requestedGeneration {
return true
}
// Mixed-version compatibility: if client requests generation 0 (default/legacy),
// allow access to any generation for backward compatibility
if requestedGeneration == 0 {
return true
}
// If client requests specific generation but volume has different generation,
// this is not compatible (strict generation matching)
return false
}