From 04bc261f9b0b2b27acce84c64c7eaf26d226c90f Mon Sep 17 00:00:00 2001 From: pingqiu Date: Tue, 31 Mar 2026 13:38:30 -0700 Subject: [PATCH] fix: deliver assignment intent to real engine orchestrator, not discard MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Finding 1: ProcessAssignments now calls v2Orchestrator.ProcessAssignment - BlockService.v2Orchestrator field (RecoveryOrchestrator) - ProcessAssignment result logged at glog V(1) - No more `_ = intent` — engine state actually changes Finding 2: localServerID documented as interim - BlockService.localServerID = listenAddr (transport-shaped) - Field doc explicitly states: INTERIM, should be registry-assigned - Used only for replica/rebuild local identity 3 integration tests (qa_block_v2bridge_test.go): - CreatesEngineSender: ProcessAssignment → engine has sender + session - EpochBump: epoch 1 → invalidate → epoch 2 → new session - AddressChange: same ServerID, different IP → sender preserved, endpoint updated Co-Authored-By: Claude Opus 4.6 (1M context) --- weed/server/qa_block_v2bridge_test.go | 158 ++++++++++++++++++++++++++ weed/server/volume_server_block.go | 32 ++++-- 2 files changed, 183 insertions(+), 7 deletions(-) create mode 100644 weed/server/qa_block_v2bridge_test.go diff --git a/weed/server/qa_block_v2bridge_test.go b/weed/server/qa_block_v2bridge_test.go new file mode 100644 index 000000000..97cc71703 --- /dev/null +++ b/weed/server/qa_block_v2bridge_test.go @@ -0,0 +1,158 @@ +package weed_server + +import ( + "testing" + + engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication" + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol/v2bridge" +) + +func newTestControlBridge() *v2bridge.ControlBridge { + return v2bridge.NewControlBridge() +} + +func newTestOrchestrator() *engine.RecoveryOrchestrator { + return engine.NewRecoveryOrchestrator() +} + +// ============================================================ +// Phase 08 P1: ProcessAssignments → V2 engine state tests +// Proves real assignment delivery changes engine state. +// ============================================================ + +func TestV2Bridge_ProcessAssignment_CreatesEngineSender(t *testing.T) { + bs := &BlockService{ + v2Bridge: newTestControlBridge(), + v2Orchestrator: newTestOrchestrator(), + localServerID: "vs1:9333", + } + + // Real assignment from master. + bs.ProcessAssignments([]blockvol.BlockVolumeAssignment{ + { + Path: "pvc-test-1", + Epoch: 1, + Role: uint32(blockvol.RolePrimary), + ReplicaServerID: "vs2", + ReplicaDataAddr: "10.0.0.2:9333", + ReplicaCtrlAddr: "10.0.0.2:9334", + }, + }) + + // Engine should have a sender for this replica. + orch := bs.V2Orchestrator() + sender := orch.Registry.Sender("pvc-test-1/vs2") + if sender == nil { + t.Fatal("engine should have sender for pvc-test-1/vs2 after ProcessAssignment") + } + if !sender.HasActiveSession() { + t.Fatal("sender should have active session (catch-up recovery)") + } +} + +func TestV2Bridge_ProcessAssignment_EpochBump(t *testing.T) { + bs := &BlockService{ + v2Bridge: newTestControlBridge(), + v2Orchestrator: newTestOrchestrator(), + localServerID: "vs1:9333", + } + + // Epoch 1 assignment. + bs.ProcessAssignments([]blockvol.BlockVolumeAssignment{ + { + Path: "pvc-test-1", + Epoch: 1, + Role: uint32(blockvol.RolePrimary), + ReplicaServerID: "vs2", + ReplicaDataAddr: "10.0.0.2:9333", + ReplicaCtrlAddr: "10.0.0.2:9334", + }, + }) + + orch := bs.V2Orchestrator() + + // Epoch bump. + orch.InvalidateEpoch(2) + orch.UpdateSenderEpoch("pvc-test-1/vs2", 2) + + // Epoch 2 assignment. + bs.ProcessAssignments([]blockvol.BlockVolumeAssignment{ + { + Path: "pvc-test-1", + Epoch: 2, + Role: uint32(blockvol.RolePrimary), + ReplicaServerID: "vs2", + ReplicaDataAddr: "10.0.0.2:9333", + ReplicaCtrlAddr: "10.0.0.2:9334", + }, + }) + + sender := orch.Registry.Sender("pvc-test-1/vs2") + if sender == nil { + t.Fatal("sender should exist at epoch 2") + } + if !sender.HasActiveSession() { + t.Fatal("should have new session at epoch 2") + } + + // Log shows invalidation. + hasInvalidation := false + for _, e := range orch.Log.EventsFor("pvc-test-1/vs2") { + if e.Event == "session_invalidated" { + hasInvalidation = true + } + } + if !hasInvalidation { + t.Fatal("log must show session invalidation on epoch bump") + } +} + +func TestV2Bridge_ProcessAssignment_AddressChange(t *testing.T) { + bs := &BlockService{ + v2Bridge: newTestControlBridge(), + v2Orchestrator: newTestOrchestrator(), + localServerID: "vs1:9333", + } + + // First assignment. + bs.ProcessAssignments([]blockvol.BlockVolumeAssignment{ + { + Path: "pvc-test-1", + Epoch: 1, + Role: uint32(blockvol.RolePrimary), + ReplicaServerID: "vs2", + ReplicaDataAddr: "10.0.0.2:9333", + ReplicaCtrlAddr: "10.0.0.2:9334", + }, + }) + + orch := bs.V2Orchestrator() + sender := orch.Registry.Sender("pvc-test-1/vs2") + if sender == nil { + t.Fatal("sender should exist") + } + + // Same ServerID, different address (address change). + bs.ProcessAssignments([]blockvol.BlockVolumeAssignment{ + { + Path: "pvc-test-1", + Epoch: 1, + Role: uint32(blockvol.RolePrimary), + ReplicaServerID: "vs2", + ReplicaDataAddr: "10.0.0.5:9333", // different IP + ReplicaCtrlAddr: "10.0.0.5:9334", + }, + }) + + // Sender identity preserved (same pointer). + senderAfter := orch.Registry.Sender("pvc-test-1/vs2") + if senderAfter != sender { + t.Fatal("sender identity must be preserved across address change") + } + + // Endpoint updated. + if senderAfter.Endpoint().DataAddr != "10.0.0.5:9333" { + t.Fatalf("endpoint not updated: %s", senderAfter.Endpoint().DataAddr) + } +} diff --git a/weed/server/volume_server_block.go b/weed/server/volume_server_block.go index 03444ed9d..399bb9543 100644 --- a/weed/server/volume_server_block.go +++ b/weed/server/volume_server_block.go @@ -11,6 +11,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/storage" + engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication" "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" "github.com/seaweedfs/seaweedfs/weed/storage/blockvol/iscsi" "github.com/seaweedfs/seaweedfs/weed/storage/blockvol/nvme" @@ -48,7 +49,17 @@ type BlockService struct { replStates map[string]*volReplState // keyed by volume path // V2 engine bridge (Phase 08 P1). - v2Bridge *v2bridge.ControlBridge + v2Bridge *v2bridge.ControlBridge + v2Orchestrator *engine.RecoveryOrchestrator + // localServerID: stable identity for this volume server. + // INTERIM: uses listenAddr (transport-shaped). Should be replaced + // with a registry-assigned stable server ID in a later hardening pass. + localServerID string +} + +// V2Orchestrator returns the V2 engine orchestrator for inspection/testing. +func (bs *BlockService) V2Orchestrator() *engine.RecoveryOrchestrator { + return bs.v2Orchestrator } // WireStateChangeNotify sets up shipper state change callbacks on all @@ -89,6 +100,8 @@ func StartBlockService(listenAddr, blockDir, iqnPrefix, portalAddr string, nvmeC listenAddr: listenAddr, nvmeListenAddr: nvmeCfg.ListenAddr, v2Bridge: v2bridge.NewControlBridge(), + v2Orchestrator: engine.NewRecoveryOrchestrator(), + localServerID: listenAddr, // INTERIM: transport-shaped, see field doc } // iSCSI target setup. @@ -333,16 +346,21 @@ func (bs *BlockService) DeleteBlockVol(name string) error { // ProcessAssignments applies assignments from master, including replication setup. // V2 bridge: also delivers each assignment to the V2 engine for recovery ownership. func (bs *BlockService) ProcessAssignments(assignments []blockvol.BlockVolumeAssignment) { - // V2 bridge: convert and deliver to engine (Phase 08 P1). - if bs.v2Bridge != nil { + // V2 bridge: convert and deliver to engine orchestrator (Phase 08 P1). + if bs.v2Bridge != nil && bs.v2Orchestrator != nil { for _, a := range assignments { - intent := bs.v2Bridge.ConvertAssignment(a, bs.listenAddr) - _ = intent // TODO(P2): deliver to engine orchestrator - glog.V(1).Infof("v2bridge: converted assignment %s epoch=%d → %d replicas", - a.Path, a.Epoch, len(intent.Replicas)) + intent := bs.v2Bridge.ConvertAssignment(a, bs.localServerID) + result := bs.v2Orchestrator.ProcessAssignment(intent) + glog.V(1).Infof("v2bridge: assignment %s epoch=%d → added=%d removed=%d sessions=%d", + a.Path, a.Epoch, len(result.Added), len(result.Removed), + len(result.SessionsCreated)+len(result.SessionsSuperseded)) } } + // V1 processing (requires blockStore). + if bs.blockStore == nil { + return + } for _, a := range assignments { role := blockvol.RoleFromWire(a.Role) ttl := blockvol.LeaseTTLFromWire(a.LeaseTtlMs)