Browse Source

fix: deliver assignment intent to real engine orchestrator, not discard

Finding 1: ProcessAssignments now calls v2Orchestrator.ProcessAssignment
- BlockService.v2Orchestrator field (RecoveryOrchestrator)
- ProcessAssignment result logged at glog V(1)
- No more `_ = intent` — engine state actually changes

Finding 2: localServerID documented as interim
- BlockService.localServerID = listenAddr (transport-shaped)
- Field doc explicitly states: INTERIM, should be registry-assigned
- Used only for replica/rebuild local identity

3 integration tests (qa_block_v2bridge_test.go):
- CreatesEngineSender: ProcessAssignment → engine has sender + session
- EpochBump: epoch 1 → invalidate → epoch 2 → new session
- AddressChange: same ServerID, different IP → sender preserved,
  endpoint updated

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
pingqiu 2 days ago
parent
commit
04bc261f9b
  1. 158
      weed/server/qa_block_v2bridge_test.go
  2. 32
      weed/server/volume_server_block.go

158
weed/server/qa_block_v2bridge_test.go

@ -0,0 +1,158 @@
package weed_server
import (
"testing"
engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol/v2bridge"
)
func newTestControlBridge() *v2bridge.ControlBridge {
return v2bridge.NewControlBridge()
}
func newTestOrchestrator() *engine.RecoveryOrchestrator {
return engine.NewRecoveryOrchestrator()
}
// ============================================================
// Phase 08 P1: ProcessAssignments → V2 engine state tests
// Proves real assignment delivery changes engine state.
// ============================================================
func TestV2Bridge_ProcessAssignment_CreatesEngineSender(t *testing.T) {
bs := &BlockService{
v2Bridge: newTestControlBridge(),
v2Orchestrator: newTestOrchestrator(),
localServerID: "vs1:9333",
}
// Real assignment from master.
bs.ProcessAssignments([]blockvol.BlockVolumeAssignment{
{
Path: "pvc-test-1",
Epoch: 1,
Role: uint32(blockvol.RolePrimary),
ReplicaServerID: "vs2",
ReplicaDataAddr: "10.0.0.2:9333",
ReplicaCtrlAddr: "10.0.0.2:9334",
},
})
// Engine should have a sender for this replica.
orch := bs.V2Orchestrator()
sender := orch.Registry.Sender("pvc-test-1/vs2")
if sender == nil {
t.Fatal("engine should have sender for pvc-test-1/vs2 after ProcessAssignment")
}
if !sender.HasActiveSession() {
t.Fatal("sender should have active session (catch-up recovery)")
}
}
func TestV2Bridge_ProcessAssignment_EpochBump(t *testing.T) {
bs := &BlockService{
v2Bridge: newTestControlBridge(),
v2Orchestrator: newTestOrchestrator(),
localServerID: "vs1:9333",
}
// Epoch 1 assignment.
bs.ProcessAssignments([]blockvol.BlockVolumeAssignment{
{
Path: "pvc-test-1",
Epoch: 1,
Role: uint32(blockvol.RolePrimary),
ReplicaServerID: "vs2",
ReplicaDataAddr: "10.0.0.2:9333",
ReplicaCtrlAddr: "10.0.0.2:9334",
},
})
orch := bs.V2Orchestrator()
// Epoch bump.
orch.InvalidateEpoch(2)
orch.UpdateSenderEpoch("pvc-test-1/vs2", 2)
// Epoch 2 assignment.
bs.ProcessAssignments([]blockvol.BlockVolumeAssignment{
{
Path: "pvc-test-1",
Epoch: 2,
Role: uint32(blockvol.RolePrimary),
ReplicaServerID: "vs2",
ReplicaDataAddr: "10.0.0.2:9333",
ReplicaCtrlAddr: "10.0.0.2:9334",
},
})
sender := orch.Registry.Sender("pvc-test-1/vs2")
if sender == nil {
t.Fatal("sender should exist at epoch 2")
}
if !sender.HasActiveSession() {
t.Fatal("should have new session at epoch 2")
}
// Log shows invalidation.
hasInvalidation := false
for _, e := range orch.Log.EventsFor("pvc-test-1/vs2") {
if e.Event == "session_invalidated" {
hasInvalidation = true
}
}
if !hasInvalidation {
t.Fatal("log must show session invalidation on epoch bump")
}
}
func TestV2Bridge_ProcessAssignment_AddressChange(t *testing.T) {
bs := &BlockService{
v2Bridge: newTestControlBridge(),
v2Orchestrator: newTestOrchestrator(),
localServerID: "vs1:9333",
}
// First assignment.
bs.ProcessAssignments([]blockvol.BlockVolumeAssignment{
{
Path: "pvc-test-1",
Epoch: 1,
Role: uint32(blockvol.RolePrimary),
ReplicaServerID: "vs2",
ReplicaDataAddr: "10.0.0.2:9333",
ReplicaCtrlAddr: "10.0.0.2:9334",
},
})
orch := bs.V2Orchestrator()
sender := orch.Registry.Sender("pvc-test-1/vs2")
if sender == nil {
t.Fatal("sender should exist")
}
// Same ServerID, different address (address change).
bs.ProcessAssignments([]blockvol.BlockVolumeAssignment{
{
Path: "pvc-test-1",
Epoch: 1,
Role: uint32(blockvol.RolePrimary),
ReplicaServerID: "vs2",
ReplicaDataAddr: "10.0.0.5:9333", // different IP
ReplicaCtrlAddr: "10.0.0.5:9334",
},
})
// Sender identity preserved (same pointer).
senderAfter := orch.Registry.Sender("pvc-test-1/vs2")
if senderAfter != sender {
t.Fatal("sender identity must be preserved across address change")
}
// Endpoint updated.
if senderAfter.Endpoint().DataAddr != "10.0.0.5:9333" {
t.Fatalf("endpoint not updated: %s", senderAfter.Endpoint().DataAddr)
}
}

32
weed/server/volume_server_block.go

@ -11,6 +11,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage"
engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol/iscsi"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol/nvme"
@ -48,7 +49,17 @@ type BlockService struct {
replStates map[string]*volReplState // keyed by volume path
// V2 engine bridge (Phase 08 P1).
v2Bridge *v2bridge.ControlBridge
v2Bridge *v2bridge.ControlBridge
v2Orchestrator *engine.RecoveryOrchestrator
// localServerID: stable identity for this volume server.
// INTERIM: uses listenAddr (transport-shaped). Should be replaced
// with a registry-assigned stable server ID in a later hardening pass.
localServerID string
}
// V2Orchestrator returns the V2 engine orchestrator for inspection/testing.
func (bs *BlockService) V2Orchestrator() *engine.RecoveryOrchestrator {
return bs.v2Orchestrator
}
// WireStateChangeNotify sets up shipper state change callbacks on all
@ -89,6 +100,8 @@ func StartBlockService(listenAddr, blockDir, iqnPrefix, portalAddr string, nvmeC
listenAddr: listenAddr,
nvmeListenAddr: nvmeCfg.ListenAddr,
v2Bridge: v2bridge.NewControlBridge(),
v2Orchestrator: engine.NewRecoveryOrchestrator(),
localServerID: listenAddr, // INTERIM: transport-shaped, see field doc
}
// iSCSI target setup.
@ -333,16 +346,21 @@ func (bs *BlockService) DeleteBlockVol(name string) error {
// ProcessAssignments applies assignments from master, including replication setup.
// V2 bridge: also delivers each assignment to the V2 engine for recovery ownership.
func (bs *BlockService) ProcessAssignments(assignments []blockvol.BlockVolumeAssignment) {
// V2 bridge: convert and deliver to engine (Phase 08 P1).
if bs.v2Bridge != nil {
// V2 bridge: convert and deliver to engine orchestrator (Phase 08 P1).
if bs.v2Bridge != nil && bs.v2Orchestrator != nil {
for _, a := range assignments {
intent := bs.v2Bridge.ConvertAssignment(a, bs.listenAddr)
_ = intent // TODO(P2): deliver to engine orchestrator
glog.V(1).Infof("v2bridge: converted assignment %s epoch=%d → %d replicas",
a.Path, a.Epoch, len(intent.Replicas))
intent := bs.v2Bridge.ConvertAssignment(a, bs.localServerID)
result := bs.v2Orchestrator.ProcessAssignment(intent)
glog.V(1).Infof("v2bridge: assignment %s epoch=%d → added=%d removed=%d sessions=%d",
a.Path, a.Epoch, len(result.Added), len(result.Removed),
len(result.SessionsCreated)+len(result.SessionsSuperseded))
}
}
// V1 processing (requires blockStore).
if bs.blockStore == nil {
return
}
for _, a := range assignments {
role := blockvol.RoleFromWire(a.Role)
ttl := blockvol.LeaseTTLFromWire(a.LeaseTtlMs)

Loading…
Cancel
Save