diff --git a/weed/server/volume_server_block.go b/weed/server/volume_server_block.go index af1d02a06..d893c4e90 100644 --- a/weed/server/volume_server_block.go +++ b/weed/server/volume_server_block.go @@ -33,13 +33,14 @@ type NVMeConfig struct { // BlockService manages block volumes and the iSCSI/NVMe target servers. type BlockService struct { - blockStore *storage.BlockVolumeStore - targetServer *iscsi.TargetServer - nvmeServer *nvme.Server - iqnPrefix string - nqnPrefix string - blockDir string - listenAddr string + blockStore *storage.BlockVolumeStore + targetServer *iscsi.TargetServer + nvmeServer *nvme.Server + iqnPrefix string + nqnPrefix string + blockDir string + listenAddr string + nvmeListenAddr string // Replication state (CP6-3). replMu sync.RWMutex @@ -63,11 +64,12 @@ func StartBlockService(listenAddr, blockDir, iqnPrefix, portalAddr string, nvmeC } bs := &BlockService{ - blockStore: storage.NewBlockVolumeStore(), - iqnPrefix: iqnPrefix, - nqnPrefix: nqnPrefix, - blockDir: blockDir, - listenAddr: listenAddr, + blockStore: storage.NewBlockVolumeStore(), + iqnPrefix: iqnPrefix, + nqnPrefix: nqnPrefix, + blockDir: blockDir, + listenAddr: listenAddr, + nvmeListenAddr: nvmeCfg.ListenAddr, } // iSCSI target setup. @@ -165,7 +167,7 @@ func (bs *BlockService) registerVolume(vol *blockvol.BlockVol, name string) { bs.targetServer.AddVolume(iqn, adapter) if bs.nvmeServer != nil { - nqn := bs.nqnPrefix + blockvol.SanitizeIQN(name) + nqn := blockvol.BuildNQN(bs.nqnPrefix, name) nvmeAdapter := nvme.NewNVMeAdapter(vol) bs.nvmeServer.AddVolume(nqn, nvmeAdapter, nvmeAdapter.DeviceNGUID()) } @@ -188,6 +190,19 @@ func (bs *BlockService) ListenAddr() string { return bs.listenAddr } +// NvmeListenAddr returns the configured NVMe/TCP target listen address, or empty if NVMe is disabled. +func (bs *BlockService) NvmeListenAddr() string { + if bs.nvmeServer != nil { + return bs.nvmeListenAddr + } + return "" +} + +// NQN returns the NVMe subsystem NQN for a volume name. +func (bs *BlockService) NQN(name string) string { + return blockvol.BuildNQN(bs.nqnPrefix, name) +} + // CreateBlockVol creates a new .blk file, registers it with BlockVolumeStore // and iSCSI TargetServer. Returns path, IQN, iSCSI addr. // Idempotent: if volume already exists with same or larger size, returns existing info. @@ -209,7 +224,7 @@ func (bs *BlockService) CreateBlockVol(name string, sizeBytes uint64, diskType s adapter := blockvol.NewBlockVolAdapter(vol) bs.targetServer.AddVolume(iqn, adapter) if bs.nvmeServer != nil { - nqn := bs.nqnPrefix + blockvol.SanitizeIQN(name) + nqn := blockvol.BuildNQN(bs.nqnPrefix, name) nvmeAdapter := nvme.NewNVMeAdapter(vol) bs.nvmeServer.AddVolume(nqn, nvmeAdapter, nvmeAdapter.DeviceNGUID()) } @@ -250,7 +265,7 @@ func (bs *BlockService) CreateBlockVol(name string, sizeBytes uint64, diskType s bs.targetServer.AddVolume(iqn, adapter) if bs.nvmeServer != nil { - nqn := bs.nqnPrefix + blockvol.SanitizeIQN(name) + nqn := blockvol.BuildNQN(bs.nqnPrefix, name) nvmeAdapter := nvme.NewNVMeAdapter(vol) bs.nvmeServer.AddVolume(nqn, nvmeAdapter, nvmeAdapter.DeviceNGUID()) } @@ -273,7 +288,7 @@ func (bs *BlockService) DeleteBlockVol(name string) error { // Remove from NVMe target. if bs.nvmeServer != nil { - nqn := bs.nqnPrefix + blockvol.SanitizeIQN(name) + nqn := blockvol.BuildNQN(bs.nqnPrefix, name) bs.nvmeServer.RemoveVolume(nqn) } diff --git a/weed/storage/blockvol/csi/controller.go b/weed/storage/blockvol/csi/controller.go index c9d33bce3..6ab00f998 100644 --- a/weed/storage/blockvol/csi/controller.go +++ b/weed/storage/blockvol/csi/controller.go @@ -77,11 +77,18 @@ func (s *controllerServer) CreateVolume(_ context.Context, req *csi.CreateVolume }, } - // Attach volume_context with iSCSI target info for NodeStageVolume. - if info.ISCSIAddr != "" || info.IQN != "" { - resp.Volume.VolumeContext = map[string]string{ - "iscsiAddr": info.ISCSIAddr, - "iqn": info.IQN, + // Attach volume_context with target info for NodeStageVolume. + hasISCSI := info.ISCSIAddr != "" || info.IQN != "" + hasNVMe := info.NvmeAddr != "" + if hasISCSI || hasNVMe { + resp.Volume.VolumeContext = make(map[string]string) + if hasISCSI { + resp.Volume.VolumeContext["iscsiAddr"] = info.ISCSIAddr + resp.Volume.VolumeContext["iqn"] = info.IQN + } + if hasNVMe { + resp.Volume.VolumeContext["nvmeAddr"] = info.NvmeAddr + resp.Volume.VolumeContext["nqn"] = info.NQN } } @@ -114,11 +121,16 @@ func (s *controllerServer) ControllerPublishVolume(_ context.Context, req *csi.C return nil, status.Errorf(codes.NotFound, "volume %q not found: %v", req.VolumeId, err) } + pubCtx := map[string]string{ + "iscsiAddr": info.ISCSIAddr, + "iqn": info.IQN, + } + if info.NvmeAddr != "" { + pubCtx["nvmeAddr"] = info.NvmeAddr + pubCtx["nqn"] = info.NQN + } return &csi.ControllerPublishVolumeResponse{ - PublishContext: map[string]string{ - "iscsiAddr": info.ISCSIAddr, - "iqn": info.IQN, - }, + PublishContext: pubCtx, }, nil } diff --git a/weed/storage/blockvol/csi/node.go b/weed/storage/blockvol/csi/node.go index c62a87cb2..dae9393f5 100644 --- a/weed/storage/blockvol/csi/node.go +++ b/weed/storage/blockvol/csi/node.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "os" + "path/filepath" "sync" "github.com/container-storage-interface/spec/lib/go/csi" @@ -13,10 +14,18 @@ import ( "google.golang.org/grpc/status" ) +const ( + transportISCSI = "iscsi" + transportNVMe = "nvme" +) + // stagedVolumeInfo tracks info needed for NodeUnstageVolume and NodeExpandVolume. type stagedVolumeInfo struct { iqn string iscsiAddr string + nqn string // NVMe subsystem NQN + nvmeAddr string // NVMe/TCP target address + transport string // "iscsi" or "nvme" isLocal bool // true if volume is served by local VolumeManager fsType string // filesystem type (ext4, xfs, etc.) stagingPath string // staging mount path @@ -27,7 +36,9 @@ type nodeServer struct { mgr *VolumeManager // may be nil in controller-only mode nodeID string iqnPrefix string // for IQN derivation fallback on restart + nqnPrefix string // for NQN derivation fallback on restart iscsiUtil ISCSIUtil + nvmeUtil NVMeUtil // may be nil if NVMe not available mountUtil MountUtil logger *log.Logger @@ -35,6 +46,10 @@ type nodeServer struct { staged map[string]*stagedVolumeInfo // volumeID -> staged info } +// transportFile is the filename written inside the staging directory to persist +// the transport type across CSI plugin restarts. +const transportFile = ".transport" + func (s *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { volumeID := req.VolumeId stagingPath := req.StagingTargetPath @@ -59,30 +74,42 @@ func (s *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolu return &csi.NodeStageVolumeResponse{}, nil } - // Determine iSCSI target info. - // Priority: publish_context (fresh from ControllerPublish, reflects failover) - // > volume_context (from CreateVolume, may be stale after failover) - // > local volume manager fallback. + // Resolve iSCSI target info. + // Priority: publish_context > volume_context > local volume manager fallback. var iqn, portal string isLocal := false if req.PublishContext != nil && req.PublishContext["iscsiAddr"] != "" && req.PublishContext["iqn"] != "" { - // Fresh address from ControllerPublishVolume (reflects current primary). portal = req.PublishContext["iscsiAddr"] iqn = req.PublishContext["iqn"] } else if req.VolumeContext != nil && req.VolumeContext["iscsiAddr"] != "" && req.VolumeContext["iqn"] != "" { - // Fallback: volume_context from CreateVolume (may be stale after failover). portal = req.VolumeContext["iscsiAddr"] iqn = req.VolumeContext["iqn"] } else if s.mgr != nil { - // Local fallback: open volume via local VolumeManager. isLocal = true if err := s.mgr.OpenVolume(volumeID); err != nil { return nil, status.Errorf(codes.Internal, "open volume: %v", err) } iqn = s.mgr.VolumeIQN(volumeID) portal = s.mgr.ListenAddr() - } else { + } + + // Resolve NVMe target info (same priority chain). + // PublishContext > VolumeContext > local VolumeManager. + var nqn, nvmeAddr string + if req.PublishContext != nil && req.PublishContext["nvmeAddr"] != "" && req.PublishContext["nqn"] != "" { + nvmeAddr = req.PublishContext["nvmeAddr"] + nqn = req.PublishContext["nqn"] + } else if req.VolumeContext != nil && req.VolumeContext["nvmeAddr"] != "" && req.VolumeContext["nqn"] != "" { + nvmeAddr = req.VolumeContext["nvmeAddr"] + nqn = req.VolumeContext["nqn"] + } else if s.mgr != nil && s.mgr.NvmeAddr() != "" { + nvmeAddr = s.mgr.NvmeAddr() + nqn = s.mgr.VolumeNQN(volumeID) + } + + // No transport info at all (neither iSCSI nor NVMe resolved, no local mgr). + if iqn == "" && nqn == "" { return nil, status.Error(codes.FailedPrecondition, "no volume_context and no local volume manager") } @@ -97,26 +124,58 @@ func (s *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolu } }() - // Check if already logged in, skip login if so. - loggedIn, err := s.iscsiUtil.IsLoggedIn(ctx, iqn) - if err != nil { - return nil, status.Errorf(codes.Internal, "check iscsi login: %v", err) - } + // Transport selection: prefer NVMe if supported, fall back to iSCSI. + var device, transport string - if !loggedIn { - // Discovery + login. - if err := s.iscsiUtil.Discovery(ctx, portal); err != nil { - return nil, status.Errorf(codes.Internal, "iscsi discovery: %v", err) + nvmeAvailable := nvmeAddr != "" && nqn != "" && s.nvmeUtil != nil && s.nvmeUtil.IsNVMeTCPAvailable() + if nvmeAvailable { + // NVMe path — fail fast on error, no fallback to iSCSI. + transport = transportNVMe + + connected, cerr := s.nvmeUtil.IsConnected(ctx, nqn) + if cerr != nil { + return nil, status.Errorf(codes.Internal, "check nvme connection: %v", cerr) } - if err := s.iscsiUtil.Login(ctx, iqn, portal); err != nil { - return nil, status.Errorf(codes.Internal, "iscsi login: %v", err) + if !connected { + if cerr := s.nvmeUtil.Connect(ctx, nqn, nvmeAddr); cerr != nil { + return nil, status.Errorf(codes.Internal, "nvme connect: %v", cerr) + } } - } - // Wait for device to appear. - device, err := s.iscsiUtil.GetDeviceByIQN(ctx, iqn) - if err != nil { - return nil, status.Errorf(codes.Internal, "get device: %v", err) + // Cleanup NVMe on subsequent failures. + defer func() { + if !success { + s.nvmeUtil.Disconnect(ctx, nqn) + } + }() + + device, err = s.nvmeUtil.GetDeviceByNQN(ctx, nqn) + if err != nil { + return nil, status.Errorf(codes.Internal, "nvme get device: %v", err) + } + } else if iqn != "" && portal != "" { + // iSCSI path (existing code). + transport = transportISCSI + + loggedIn, lerr := s.iscsiUtil.IsLoggedIn(ctx, iqn) + if lerr != nil { + return nil, status.Errorf(codes.Internal, "check iscsi login: %v", lerr) + } + if !loggedIn { + if err := s.iscsiUtil.Discovery(ctx, portal); err != nil { + return nil, status.Errorf(codes.Internal, "iscsi discovery: %v", err) + } + if err := s.iscsiUtil.Login(ctx, iqn, portal); err != nil { + return nil, status.Errorf(codes.Internal, "iscsi login: %v", err) + } + } + + device, err = s.iscsiUtil.GetDeviceByIQN(ctx, iqn) + if err != nil { + return nil, status.Errorf(codes.Internal, "get device: %v", err) + } + } else { + return nil, status.Error(codes.FailedPrecondition, "no transport available") } // Ensure staging directory exists. @@ -136,6 +195,11 @@ func (s *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolu return nil, status.Errorf(codes.Internal, "format and mount: %v", err) } + // Write transport marker for restart recovery. + if werr := writeTransportFile(stagingPath, transport); werr != nil { + s.logger.Printf("NodeStageVolume: %s: %v (non-fatal)", volumeID, werr) + } + // Track staged volume for unstage. s.stagedMu.Lock() if s.staged == nil { @@ -144,6 +208,9 @@ func (s *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolu s.staged[volumeID] = &stagedVolumeInfo{ iqn: iqn, iscsiAddr: portal, + nqn: nqn, + nvmeAddr: nvmeAddr, + transport: transport, isLocal: isLocal, fsType: fsType, stagingPath: stagingPath, @@ -151,7 +218,7 @@ func (s *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolu s.stagedMu.Unlock() success = true - s.logger.Printf("NodeStageVolume: %s staged at %s (device=%s, iqn=%s, local=%v)", volumeID, stagingPath, device, iqn, isLocal) + s.logger.Printf("NodeStageVolume: %s staged at %s (device=%s, transport=%s, local=%v)", volumeID, stagingPath, device, transport, isLocal) return &csi.NodeStageVolumeResponse{}, nil } @@ -166,26 +233,49 @@ func (s *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstage return nil, status.Error(codes.InvalidArgument, "staging target path is required") } - // Look up staged info. If not found (e.g. driver restarted), derive IQN. + // Look up staged info. s.stagedMu.Lock() info := s.staged[volumeID] s.stagedMu.Unlock() - var iqn string + // Determine transport and identifiers. + var iqn, nqn, transport string isLocal := false + if info != nil { iqn = info.iqn + nqn = info.nqn + transport = info.transport isLocal = info.isLocal } else { - // Restart fallback: derive IQN from volumeID. - // iscsiadm -m node -T --logout works without knowing the portal. + // Restart fallback: read .transport file from staging path. + transport = readTransportFile(stagingPath) + + // Derive identifiers. if s.mgr != nil { iqn = s.mgr.VolumeIQN(volumeID) + nqn = s.mgr.VolumeNQN(volumeID) isLocal = true - } else if s.iqnPrefix != "" { - iqn = s.iqnPrefix + ":" + blockvol.SanitizeIQN(volumeID) + } else { + if s.iqnPrefix != "" { + iqn = s.iqnPrefix + ":" + blockvol.SanitizeIQN(volumeID) + } + if s.nqnPrefix != "" { + nqn = blockvol.BuildNQN(s.nqnPrefix, volumeID) + } } - s.logger.Printf("NodeUnstageVolume: %s not in staged map, derived iqn=%s", volumeID, iqn) + + // If no .transport file, probe NVMe connection to determine transport. + if transport == "" && nqn != "" && s.nvmeUtil != nil { + if connected, _ := s.nvmeUtil.IsConnected(ctx, nqn); connected { + transport = transportNVMe + } + } + if transport == "" { + transport = transportISCSI // default fallback + } + + s.logger.Printf("NodeUnstageVolume: %s not in staged map, derived transport=%s", volumeID, transport) } // Best-effort cleanup: always attempt all steps even if one fails. @@ -197,12 +287,24 @@ func (s *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstage firstErr = err } - // iSCSI logout. - if iqn != "" { - if err := s.iscsiUtil.Logout(ctx, iqn); err != nil { - s.logger.Printf("NodeUnstageVolume: logout error: %v", err) - if firstErr == nil { - firstErr = err + // Disconnect transport. + switch transport { + case transportNVMe: + if nqn != "" && s.nvmeUtil != nil { + if err := s.nvmeUtil.Disconnect(ctx, nqn); err != nil { + s.logger.Printf("NodeUnstageVolume: nvme disconnect error: %v", err) + if firstErr == nil { + firstErr = err + } + } + } + default: // iSCSI + if iqn != "" { + if err := s.iscsiUtil.Logout(ctx, iqn); err != nil { + s.logger.Printf("NodeUnstageVolume: logout error: %v", err) + if firstErr == nil { + firstErr = err + } } } } @@ -218,10 +320,13 @@ func (s *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstage } if firstErr != nil { - // Keep staged entry so retry has correct isLocal/iqn info. + // Keep staged entry so retry has correct info. return nil, status.Errorf(codes.Internal, "unstage: %v", firstErr) } + // Clean up transport file. + os.Remove(filepath.Join(stagingPath, transportFile)) + // Remove from staged map only after successful cleanup. s.stagedMu.Lock() delete(s.staged, volumeID) @@ -310,24 +415,38 @@ func (s *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVo return nil, status.Errorf(codes.FailedPrecondition, "volume %q not staged", req.VolumeId) } - // Check that iSCSI session is active. - loggedIn, err := s.iscsiUtil.IsLoggedIn(ctx, info.iqn) - if err != nil { - return nil, status.Errorf(codes.Internal, "check iSCSI session: %v", err) - } - if !loggedIn { - return nil, status.Errorf(codes.FailedPrecondition, "volume %q not staged: iSCSI session not active", req.VolumeId) - } - - // Rescan device to pick up new size. - if err := s.iscsiUtil.RescanDevice(ctx, info.iqn); err != nil { - return nil, status.Errorf(codes.Internal, "rescan iSCSI device: %v", err) - } + var device string + var err error - // Find the device path. - device, err := s.iscsiUtil.GetDeviceByIQN(ctx, info.iqn) - if err != nil { - return nil, status.Errorf(codes.Internal, "find device: %v", err) + switch info.transport { + case transportNVMe: + // NVMe: rescan namespace, then find device. + if s.nvmeUtil == nil { + return nil, status.Errorf(codes.Internal, "nvme util not available") + } + if err := s.nvmeUtil.Rescan(ctx, info.nqn); err != nil { + return nil, status.Errorf(codes.Internal, "nvme rescan: %v", err) + } + device, err = s.nvmeUtil.GetDeviceByNQN(ctx, info.nqn) + if err != nil { + return nil, status.Errorf(codes.Internal, "nvme find device: %v", err) + } + default: // iSCSI + // Check that iSCSI session is active. + loggedIn, lerr := s.iscsiUtil.IsLoggedIn(ctx, info.iqn) + if lerr != nil { + return nil, status.Errorf(codes.Internal, "check iSCSI session: %v", lerr) + } + if !loggedIn { + return nil, status.Errorf(codes.FailedPrecondition, "volume %q not staged: iSCSI session not active", req.VolumeId) + } + if err := s.iscsiUtil.RescanDevice(ctx, info.iqn); err != nil { + return nil, status.Errorf(codes.Internal, "rescan iSCSI device: %v", err) + } + device, err = s.iscsiUtil.GetDeviceByIQN(ctx, info.iqn) + if err != nil { + return nil, status.Errorf(codes.Internal, "find device: %v", err) + } } // Determine mount path and fsType. @@ -345,7 +464,7 @@ func (s *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVo return nil, status.Errorf(codes.Internal, "resize filesystem: %v", err) } - s.logger.Printf("NodeExpandVolume: %s expanded (device=%s, fs=%s)", req.VolumeId, device, fsType) + s.logger.Printf("NodeExpandVolume: %s expanded (device=%s, transport=%s, fs=%s)", req.VolumeId, device, info.transport, fsType) capacity := int64(0) if req.CapacityRange != nil { @@ -384,3 +503,28 @@ func (s *nodeServer) NodeGetInfo(_ context.Context, _ *csi.NodeGetInfoRequest) ( }, }, nil } + +// writeTransportFile writes the transport type to a file inside the staging path +// so it can be recovered after CSI plugin restart. +func writeTransportFile(stagingPath, transport string) error { + path := filepath.Join(stagingPath, transportFile) + if err := os.WriteFile(path, []byte(transport), 0600); err != nil { + return fmt.Errorf("write transport file: %w", err) + } + return nil +} + +// readTransportFile reads the transport type from the staging path. +// Returns empty string if the file doesn't exist. +func readTransportFile(stagingPath string) string { + path := filepath.Join(stagingPath, transportFile) + data, err := os.ReadFile(path) + if err != nil { + return "" + } + t := string(data) + if t == transportISCSI || t == transportNVMe { + return t + } + return "" +} diff --git a/weed/storage/blockvol/csi/nvme_node_test.go b/weed/storage/blockvol/csi/nvme_node_test.go new file mode 100644 index 000000000..b1240dd82 --- /dev/null +++ b/weed/storage/blockvol/csi/nvme_node_test.go @@ -0,0 +1,1222 @@ +package csi + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "os" + "path/filepath" + "strings" + "sync" + "testing" + + "github.com/container-storage-interface/spec/lib/go/csi" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// --- Transport selection tests --- + +func TestNodeStageVolume_NVMe(t *testing.T) { + mi := newMockISCSIUtil() + mn := newMockNVMeUtil() + mn.nvmeTCPAvailable = true + mn.getDeviceResult = "/dev/nvme0n1" + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iqnPrefix: "iqn.2024.com.seaweedfs", + nqnPrefix: "nqn.2024-01.com.seaweedfs:vol.", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[test-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + stagingPath := t.TempDir() + + _, err := ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "nvme-vol", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + "iscsiAddr": "10.0.0.5:3260", + "iqn": "iqn.2024.com.seaweedfs:nvme-vol", + "nvmeAddr": "10.0.0.5:4420", + "nqn": "nqn.2024.com.seaweedfs:nvme-vol", + }, + }) + if err != nil { + t.Fatalf("NodeStageVolume: %v", err) + } + + // Verify NVMe connect was called. + foundConnect := false + for _, c := range mn.calls { + if strings.HasPrefix(c, "connect:nqn.2024.com.seaweedfs:nvme-vol:10.0.0.5:4420") { + foundConnect = true + } + } + if !foundConnect { + t.Fatalf("expected NVMe connect call, got: %v", mn.calls) + } + + // Verify iSCSI was NOT called. + if len(mi.calls) > 0 { + t.Fatalf("expected no iSCSI calls, got: %v", mi.calls) + } + + // Verify transport is NVMe. + ns.stagedMu.Lock() + info := ns.staged["nvme-vol"] + ns.stagedMu.Unlock() + if info == nil { + t.Fatal("expected nvme-vol in staged map") + } + if info.transport != transportNVMe { + t.Fatalf("expected transport=nvme, got %q", info.transport) + } + + // Verify .transport file was written. + data, err := os.ReadFile(filepath.Join(stagingPath, transportFile)) + if err != nil { + t.Fatalf("read transport file: %v", err) + } + if string(data) != "nvme" { + t.Fatalf("transport file: got %q, want nvme", string(data)) + } +} + +func TestNodeStageVolume_NVMe_FallbackISCSI_ModuleNotLoaded(t *testing.T) { + mi := newMockISCSIUtil() + mi.getDeviceResult = "/dev/sdb" + mn := newMockNVMeUtil() + mn.nvmeTCPAvailable = false // nvme_tcp NOT loaded + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[test-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + stagingPath := t.TempDir() + + _, err := ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "fallback-vol", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + "iscsiAddr": "10.0.0.5:3260", + "iqn": "iqn.2024.com.seaweedfs:fallback-vol", + "nvmeAddr": "10.0.0.5:4420", + "nqn": "nqn.2024.com.seaweedfs:fallback-vol", + }, + }) + if err != nil { + t.Fatalf("NodeStageVolume: %v", err) + } + + // Should have used iSCSI. + if len(mi.calls) == 0 { + t.Fatal("expected iSCSI calls") + } + if len(mn.calls) > 0 { + t.Fatalf("expected no NVMe calls, got: %v", mn.calls) + } + + // Verify transport is iSCSI. + ns.stagedMu.Lock() + info := ns.staged["fallback-vol"] + ns.stagedMu.Unlock() + if info.transport != transportISCSI { + t.Fatalf("expected transport=iscsi, got %q", info.transport) + } +} + +func TestNodeStageVolume_NVMe_ConnectFails_NoFallback(t *testing.T) { + mi := newMockISCSIUtil() + mi.getDeviceResult = "/dev/sdb" + mn := newMockNVMeUtil() + mn.nvmeTCPAvailable = true + mn.connectErr = errors.New("connect refused") + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[test-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + stagingPath := t.TempDir() + + _, err := ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "fail-vol", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + "iscsiAddr": "10.0.0.5:3260", + "iqn": "iqn.2024.com.seaweedfs:fail-vol", + "nvmeAddr": "10.0.0.5:4420", + "nqn": "nqn.2024.com.seaweedfs:fail-vol", + }, + }) + if err == nil { + t.Fatal("expected error when NVMe connect fails") + } + + // Verify error and no fallback to iSCSI. + st, _ := status.FromError(err) + if st.Code() != codes.Internal { + t.Fatalf("expected Internal error, got %v", st.Code()) + } + if !strings.Contains(err.Error(), "nvme connect") { + t.Fatalf("expected nvme connect in error, got: %v", err) + } + + // iSCSI should NOT have been called (no fallback). + if len(mi.calls) > 0 { + t.Fatalf("expected no iSCSI fallback calls, got: %v", mi.calls) + } +} + +func TestNodeStageVolume_ISCSIOnly(t *testing.T) { + mi := newMockISCSIUtil() + mi.getDeviceResult = "/dev/sdb" + mn := newMockNVMeUtil() + mn.nvmeTCPAvailable = true + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[test-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + stagingPath := t.TempDir() + + // No NVMe fields in context → iSCSI only. + _, err := ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "iscsi-only", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + "iscsiAddr": "10.0.0.5:3260", + "iqn": "iqn.2024.com.seaweedfs:iscsi-only", + }, + }) + if err != nil { + t.Fatalf("NodeStageVolume: %v", err) + } + + // iSCSI should be used. + if len(mi.calls) == 0 { + t.Fatal("expected iSCSI calls") + } + if len(mn.calls) > 0 { + t.Fatalf("expected no NVMe calls, got: %v", mn.calls) + } + + ns.stagedMu.Lock() + info := ns.staged["iscsi-only"] + ns.stagedMu.Unlock() + if info.transport != transportISCSI { + t.Fatalf("expected transport=iscsi, got %q", info.transport) + } +} + +func TestNodeStageVolume_TransportStickiness(t *testing.T) { + mn := newMockNVMeUtil() + mn.nvmeTCPAvailable = true + mn.getDeviceResult = "/dev/nvme0n1" + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[test-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + stagingPath := t.TempDir() + + // First stage via NVMe. + _, err := ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "sticky-vol", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + "iscsiAddr": "10.0.0.5:3260", + "iqn": "iqn.2024.com.seaweedfs:sticky-vol", + "nvmeAddr": "10.0.0.5:4420", + "nqn": "nqn.2024.com.seaweedfs:sticky-vol", + }, + }) + if err != nil { + t.Fatalf("NodeStageVolume: %v", err) + } + + // Verify staged as NVMe. + ns.stagedMu.Lock() + info := ns.staged["sticky-vol"] + ns.stagedMu.Unlock() + if info.transport != transportNVMe { + t.Fatalf("expected transport=nvme, got %q", info.transport) + } + + // Re-stage attempt (already mounted) → idempotent, transport stays. + mm.isMountedTargets[stagingPath] = true + _, err = ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "sticky-vol", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + "iscsiAddr": "10.0.0.5:3260", + "iqn": "iqn.2024.com.seaweedfs:sticky-vol", + // No NVMe fields this time. + }, + }) + if err != nil { + t.Fatalf("re-stage: %v", err) + } + + // Transport should still be NVMe from first stage. + ns.stagedMu.Lock() + info = ns.staged["sticky-vol"] + ns.stagedMu.Unlock() + if info.transport != transportNVMe { + t.Fatalf("expected transport=nvme after re-stage, got %q", info.transport) + } +} + +// --- Idempotency tests --- + +func TestNodeStageVolume_NVMe_AlreadyConnected(t *testing.T) { + mn := newMockNVMeUtil() + mn.nvmeTCPAvailable = true + mn.connected["nqn.2024.com.seaweedfs:precon-vol"] = true + mn.getDeviceResult = "/dev/nvme0n1" + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[test-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + stagingPath := t.TempDir() + + _, err := ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "precon-vol", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + "nvmeAddr": "10.0.0.5:4420", + "nqn": "nqn.2024.com.seaweedfs:precon-vol", + }, + }) + if err != nil { + t.Fatalf("NodeStageVolume: %v", err) + } + + // Connect should NOT have been called (already connected). + for _, c := range mn.calls { + if strings.HasPrefix(c, "connect:") { + t.Fatalf("expected no connect call (already connected), got: %v", mn.calls) + } + } +} + +func TestNodeStageVolume_NVMe_AlreadyMounted(t *testing.T) { + mn := newMockNVMeUtil() + mn.nvmeTCPAvailable = true + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[test-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + stagingPath := t.TempDir() + mm.isMountedTargets[stagingPath] = true // already mounted + + _, err := ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "mounted-vol", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + "nvmeAddr": "10.0.0.5:4420", + "nqn": "nqn.2024.com.seaweedfs:mounted-vol", + }, + }) + if err != nil { + t.Fatalf("NodeStageVolume: %v", err) + } + + // No NVMe or iSCSI calls should have been made. + if len(mn.calls) > 0 { + t.Fatalf("expected no NVMe calls, got: %v", mn.calls) + } + if len(mi.calls) > 0 { + t.Fatalf("expected no iSCSI calls, got: %v", mi.calls) + } +} + +func TestNodeStageVolume_NVMe_ConcurrentSameVolume(t *testing.T) { + mn := newMockNVMeUtil() + mn.nvmeTCPAvailable = true + mn.getDeviceResult = "/dev/nvme0n1" + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[test-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + stagingPath := t.TempDir() + ctx := context.Background() + + var wg sync.WaitGroup + errs := make([]error, 2) + for i := 0; i < 2; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + _, errs[idx] = ns.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{ + VolumeId: "concurrent-vol", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + "nvmeAddr": "10.0.0.5:4420", + "nqn": "nqn.2024.com.seaweedfs:concurrent-vol", + }, + }) + }(i) + } + wg.Wait() + + // At least one should succeed. + success := false + for _, err := range errs { + if err == nil { + success = true + } + } + if !success { + t.Fatalf("expected at least one concurrent stage to succeed: %v, %v", errs[0], errs[1]) + } +} + +// --- Unstage tests --- + +func TestNodeUnstageVolume_NVMe(t *testing.T) { + mn := newMockNVMeUtil() + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[test-nvme] ", log.LstdFlags), + staged: map[string]*stagedVolumeInfo{ + "unstage-vol": { + nqn: "nqn.2024.com.seaweedfs:unstage-vol", + nvmeAddr: "10.0.0.5:4420", + transport: transportNVMe, + }, + }, + } + + stagingPath := t.TempDir() + + _, err := ns.NodeUnstageVolume(context.Background(), &csi.NodeUnstageVolumeRequest{ + VolumeId: "unstage-vol", + StagingTargetPath: stagingPath, + }) + if err != nil { + t.Fatalf("NodeUnstageVolume: %v", err) + } + + // Verify NVMe disconnect was called. + foundDisconnect := false + for _, c := range mn.calls { + if c == "disconnect:nqn.2024.com.seaweedfs:unstage-vol" { + foundDisconnect = true + } + } + if !foundDisconnect { + t.Fatalf("expected NVMe disconnect call, got: %v", mn.calls) + } + + // Verify iSCSI logout was NOT called. + for _, c := range mi.calls { + if strings.HasPrefix(c, "logout:") { + t.Fatalf("expected no iSCSI logout for NVMe volume, got: %v", mi.calls) + } + } +} + +func TestNodeUnstageVolume_NVMe_RestartFallback_TransportFile(t *testing.T) { + mn := newMockNVMeUtil() + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + nqnPrefix: "nqn.2024-01.com.seaweedfs:vol.", + iqnPrefix: "iqn.2024.com.seaweedfs", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[test-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), // empty (restart) + } + + // Write a .transport file indicating NVMe. + stagingPath := t.TempDir() + os.WriteFile(filepath.Join(stagingPath, transportFile), []byte("nvme"), 0600) + + _, err := ns.NodeUnstageVolume(context.Background(), &csi.NodeUnstageVolumeRequest{ + VolumeId: "restart-nvme-vol", + StagingTargetPath: stagingPath, + }) + if err != nil { + t.Fatalf("NodeUnstageVolume: %v", err) + } + + // Verify NVMe disconnect was called (derived NQN from nqnPrefix). + foundDisconnect := false + for _, c := range mn.calls { + if strings.HasPrefix(c, "disconnect:") { + foundDisconnect = true + } + } + if !foundDisconnect { + t.Fatalf("expected NVMe disconnect from transport file, got: %v", mn.calls) + } +} + +func TestNodeUnstageVolume_NVMe_RestartFallback_Probe(t *testing.T) { + mn := newMockNVMeUtil() + mn.connected["nqn.2024-01.com.seaweedfs:vol.probe-vol"] = true + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + nqnPrefix: "nqn.2024-01.com.seaweedfs:vol.", + iqnPrefix: "iqn.2024.com.seaweedfs", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[test-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), // empty (restart) + } + + stagingPath := t.TempDir() + // No .transport file — will probe NVMe connection. + + _, err := ns.NodeUnstageVolume(context.Background(), &csi.NodeUnstageVolumeRequest{ + VolumeId: "probe-vol", + StagingTargetPath: stagingPath, + }) + if err != nil { + t.Fatalf("NodeUnstageVolume: %v", err) + } + + // Verify NVMe disconnect was called (probe found connection). + foundDisconnect := false + for _, c := range mn.calls { + if strings.HasPrefix(c, "disconnect:") { + foundDisconnect = true + } + } + if !foundDisconnect { + t.Fatalf("expected NVMe disconnect from probe, got: %v", mn.calls) + } +} + +// --- Expand tests --- + +func TestNodeExpandVolume_NVMe(t *testing.T) { + mn := newMockNVMeUtil() + mn.getDeviceResult = "/dev/nvme0n1" + mn.getControllerResult = "/dev/nvme0" + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[test-nvme] ", log.LstdFlags), + staged: map[string]*stagedVolumeInfo{ + "expand-vol": { + nqn: "nqn.2024.com.seaweedfs:expand-vol", + transport: transportNVMe, + fsType: "ext4", + stagingPath: "/staging/expand-vol", + }, + }, + } + + _, err := ns.NodeExpandVolume(context.Background(), &csi.NodeExpandVolumeRequest{ + VolumeId: "expand-vol", + CapacityRange: &csi.CapacityRange{RequiredBytes: 8 * 1024 * 1024}, + }) + if err != nil { + t.Fatalf("NodeExpandVolume: %v", err) + } + + // Verify NVMe rescan was called. + foundRescan := false + for _, c := range mn.calls { + if strings.HasPrefix(c, "rescan:") { + foundRescan = true + } + } + if !foundRescan { + t.Fatalf("expected NVMe rescan call, got: %v", mn.calls) + } + + // Verify resize was called. + foundResize := false + for _, c := range mm.calls { + if strings.HasPrefix(c, "resizefs:") { + foundResize = true + } + } + if !foundResize { + t.Fatalf("expected resizefs call, got: %v", mm.calls) + } + + // Verify iSCSI rescan was NOT called. + for _, c := range mi.calls { + if strings.HasPrefix(c, "rescan:") { + t.Fatalf("expected no iSCSI rescan for NVMe volume, got: %v", mi.calls) + } + } +} + +// --- Failure cleanup tests --- + +func TestNodeStageVolume_NVMe_GetDeviceFails_Cleanup(t *testing.T) { + mn := newMockNVMeUtil() + mn.nvmeTCPAvailable = true + mn.getDeviceErr = errors.New("device not found") + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[test-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + stagingPath := t.TempDir() + + _, err := ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "cleanup-vol", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + "nvmeAddr": "10.0.0.5:4420", + "nqn": "nqn.2024.com.seaweedfs:cleanup-vol", + }, + }) + if err == nil { + t.Fatal("expected error when GetDeviceByNQN fails") + } + + // Verify Disconnect was called in cleanup. + foundDisconnect := false + for _, c := range mn.calls { + if strings.HasPrefix(c, "disconnect:") { + foundDisconnect = true + } + } + if !foundDisconnect { + t.Fatalf("expected Disconnect in cleanup after GetDevice failure, got: %v", mn.calls) + } +} + +func TestNodeStageVolume_NVMe_FormatFails_Cleanup(t *testing.T) { + mn := newMockNVMeUtil() + mn.nvmeTCPAvailable = true + mn.getDeviceResult = "/dev/nvme0n1" + mi := newMockISCSIUtil() + mm := newMockMountUtil() + mm.formatAndMountErr = errors.New("mkfs failed") + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[test-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + stagingPath := t.TempDir() + + _, err := ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "format-fail-vol", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + "nvmeAddr": "10.0.0.5:4420", + "nqn": "nqn.2024.com.seaweedfs:format-fail-vol", + }, + }) + if err == nil { + t.Fatal("expected error when format fails") + } + + // Verify Disconnect was called in cleanup. + foundDisconnect := false + for _, c := range mn.calls { + if strings.HasPrefix(c, "disconnect:") { + foundDisconnect = true + } + } + if !foundDisconnect { + t.Fatalf("expected Disconnect in cleanup after format failure, got: %v", mn.calls) + } +} + +// --- Controller/VolumeInfo tests --- + +func TestControllerPublish_NVMeFields(t *testing.T) { + mgr := newTestManager(t) + + backend := NewLocalVolumeBackend(mgr) + cs := &controllerServer{backend: backend} + + if err := mgr.CreateVolume("nvme-pub-vol", 4*1024*1024); err != nil { + t.Fatalf("create volume: %v", err) + } + + resp, err := cs.ControllerPublishVolume(context.Background(), &csi.ControllerPublishVolumeRequest{ + VolumeId: "nvme-pub-vol", + NodeId: "node-1", + }) + if err != nil { + t.Fatalf("ControllerPublishVolume: %v", err) + } + + // iSCSI fields should always be present. + if resp.PublishContext["iscsiAddr"] == "" { + t.Fatal("expected iscsiAddr in PublishContext") + } + if resp.PublishContext["iqn"] == "" { + t.Fatal("expected iqn in PublishContext") + } + + // NVMe fields are empty when VolumeManager has no NVMe config. + if resp.PublishContext["nvmeAddr"] != "" { + t.Fatalf("expected no nvmeAddr (NVMe not configured), got %q", resp.PublishContext["nvmeAddr"]) + } +} + +func TestControllerPublish_NoNVMe(t *testing.T) { + mgr := newTestManager(t) + + backend := NewLocalVolumeBackend(mgr) + cs := &controllerServer{backend: backend} + + if err := mgr.CreateVolume("iscsi-only-vol", 4*1024*1024); err != nil { + t.Fatalf("create volume: %v", err) + } + + resp, err := cs.ControllerPublishVolume(context.Background(), &csi.ControllerPublishVolumeRequest{ + VolumeId: "iscsi-only-vol", + NodeId: "node-1", + }) + if err != nil { + t.Fatalf("ControllerPublishVolume: %v", err) + } + + // Only iSCSI fields, no NVMe keys. + if _, ok := resp.PublishContext["nvmeAddr"]; ok { + t.Fatal("expected no nvmeAddr key in PublishContext when NVMe not configured") + } + if _, ok := resp.PublishContext["nqn"]; ok { + t.Fatal("expected no nqn key in PublishContext when NVMe not configured") + } +} + +func TestCreateVolume_NVMeContext(t *testing.T) { + dir := t.TempDir() + logger := log.New(os.Stderr, "[test-vm] ", log.LstdFlags) + mgr := NewVolumeManager(dir, "127.0.0.1:0", "iqn.2024.com.seaweedfs", logger, + VolumeManagerOpts{NvmeAddr: "10.0.0.5:4420", NQNPrefix: "nqn.2024-01.com.seaweedfs:vol."}) + if err := mgr.Start(context.Background()); err != nil { + t.Fatalf("start: %v", err) + } + t.Cleanup(func() { mgr.Stop() }) + + backend := NewLocalVolumeBackend(mgr) + cs := &controllerServer{backend: backend} + + resp, err := cs.CreateVolume(context.Background(), &csi.CreateVolumeRequest{ + Name: "nvme-ctx-vol", + VolumeCapabilities: testVolCaps(), + CapacityRange: &csi.CapacityRange{RequiredBytes: 4 * 1024 * 1024}, + }) + if err != nil { + t.Fatalf("CreateVolume: %v", err) + } + + vc := resp.Volume.VolumeContext + if vc["nvmeAddr"] != "10.0.0.5:4420" { + t.Fatalf("expected nvmeAddr=10.0.0.5:4420, got %q", vc["nvmeAddr"]) + } + if !strings.HasPrefix(vc["nqn"], "nqn.2024-01.com.seaweedfs:vol.") { + t.Fatalf("expected nqn prefix, got %q", vc["nqn"]) + } +} + +// --- Parity and edge-case tests (review findings) --- + +// TestNQNParity_CSIAndBlockService verifies that CSI VolumeManager and BlockService +// produce identical NQNs for the same volume name with the same prefix. +func TestNQNParity_CSIAndBlockService(t *testing.T) { + prefix := "nqn.2024-01.com.seaweedfs:vol." + names := []string{ + "simple-vol", + "pvc-abc123-def456", + "UPPERCASE-Volume", + "special.chars-here_ok", + "a-very-long-volume-name-that-might-need-truncation-or-hashing-to-stay-within-limits-xxxxxxxxx", + } + + for _, name := range names { + // CSI path: VolumeManager.volumeNQN → blockvol.BuildNQN(prefix, name) + dir := t.TempDir() + logger := log.New(os.Stderr, "[test] ", log.LstdFlags) + mgr := NewVolumeManager(dir, "127.0.0.1:0", "iqn.test", logger, + VolumeManagerOpts{NQNPrefix: prefix}) + if err := mgr.Start(context.Background()); err != nil { + t.Fatalf("start: %v", err) + } + csiNQN := mgr.VolumeNQN(name) + mgr.Stop() + + // BlockService path: blockvol.BuildNQN(prefix, name) + // This is what volume_server_block.go:registerVolume and NQN() use. + blockNQN := SanitizeIQN(name) // prefix + SanitizeIQN + blockNQN = prefix + blockNQN + + if csiNQN != blockNQN { + t.Errorf("NQN mismatch for %q: CSI=%q BlockService=%q", name, csiNQN, blockNQN) + } + } +} + +// TestIsConnected_ErrorClassification verifies that IsConnected distinguishes +// "NQN not found" from command/parse failures. +func TestIsConnected_ErrorClassification(t *testing.T) { + // Test with errNQNNotFound — should return false, nil. + r := &realNVMeUtil{} + + // We can't run real nvme commands on Windows, so we test the sentinel logic directly. + // Verify errNQNNotFound is a distinct sentinel. + if !errors.Is(errNQNNotFound, errNQNNotFound) { + t.Fatal("errNQNNotFound sentinel should be identifiable via errors.Is") + } + + // Verify it's different from a wrapped exec error. + execErr := fmt.Errorf("nvme list-subsys: command not found: %w", errors.New("exit code 127")) + if errors.Is(execErr, errNQNNotFound) { + t.Fatal("exec error should NOT match errNQNNotFound") + } + + // Verify the mock distinguishes correctly. + mn := newMockNVMeUtil() + + // Not connected (NQN absent) → (false, nil). + connected, err := mn.IsConnected(context.Background(), "nqn.absent") + if err != nil { + t.Fatalf("expected no error for absent NQN, got: %v", err) + } + if connected { + t.Fatal("expected not connected for absent NQN") + } + + // Connected → (true, nil). + mn.connected["nqn.present"] = true + connected, err = mn.IsConnected(context.Background(), "nqn.present") + if err != nil { + t.Fatalf("expected no error: %v", err) + } + if !connected { + t.Fatal("expected connected") + } + + _ = r // suppress unused warning, real path tested via integration on Linux +} + +// TestFindSubsys_Fixture verifies findSubsys path selection with a multi-path JSON fixture. +func TestFindSubsys_Fixture(t *testing.T) { + // Test the JSON parsing and path selection logic directly. + fixture := `{ + "Subsystems": [ + { + "NQN": "nqn.2024-01.com.seaweedfs:vol.test-vol", + "Paths": [ + {"Name": "nvme1", "Transport": "rdma", "State": "live"}, + {"Name": "nvme2", "Transport": "tcp", "State": "connecting"}, + {"Name": "nvme3", "Transport": "tcp", "State": "live"}, + {"Name": "", "Transport": "tcp", "State": "live"} + ] + }, + { + "NQN": "nqn.other:subsystem", + "Paths": [ + {"Name": "nvme0", "Transport": "tcp", "State": "live"} + ] + } + ] + }` + + var parsed nvmeListSubsysOutput + if err := json.Unmarshal([]byte(fixture), &parsed); err != nil { + t.Fatalf("parse fixture: %v", err) + } + + // Simulate findSubsys logic for target NQN. + nqn := "nqn.2024-01.com.seaweedfs:vol.test-vol" + var foundCtrl, foundDev string + for _, ss := range parsed.Subsystems { + if ss.NQN != nqn { + continue + } + var fallbackCtrl string + for _, p := range ss.Paths { + if p.Name == "" { + continue + } + ctrl := "/dev/" + p.Name + dev := ctrl + "n1" + if strings.EqualFold(p.Transport, "tcp") && strings.EqualFold(p.State, "live") { + foundCtrl = ctrl + foundDev = dev + break + } + if fallbackCtrl == "" { + fallbackCtrl = ctrl + } + } + if foundCtrl == "" && fallbackCtrl != "" { + foundCtrl = fallbackCtrl + foundDev = fallbackCtrl + "n1" + } + } + + // Should prefer nvme3 (tcp+live), not nvme1 (rdma) or nvme2 (connecting). + if foundCtrl != "/dev/nvme3" { + t.Fatalf("expected /dev/nvme3 (tcp+live), got %q", foundCtrl) + } + if foundDev != "/dev/nvme3n1" { + t.Fatalf("expected /dev/nvme3n1, got %q", foundDev) + } + + // Test NQN not found. + found := false + for _, ss := range parsed.Subsystems { + if ss.NQN == "nqn.nonexistent" { + found = true + } + } + if found { + t.Fatal("nonexistent NQN should not be found") + } +} + +// TestMasterBackend_NVMeFieldsAbsent verifies that MasterVolumeClient +// returns empty NVMe fields (deferred scope per CP10-2 design). +func TestMasterBackend_NVMeFieldsAbsent(t *testing.T) { + // We can't call real master gRPC, so verify the struct directly. + // The point: VolumeInfo from MasterVolumeClient has empty NvmeAddr/NQN. + info := &VolumeInfo{ + VolumeID: "master-vol", + ISCSIAddr: "10.0.0.5:3260", + IQN: "iqn.2024.com.seaweedfs:master-vol", + // NvmeAddr and NQN intentionally not set (master proto lacks these fields). + } + + if info.NvmeAddr != "" { + t.Fatalf("expected empty NvmeAddr from master backend, got %q", info.NvmeAddr) + } + if info.NQN != "" { + t.Fatalf("expected empty NQN from master backend, got %q", info.NQN) + } + + // Verify controller doesn't add NVMe keys when fields are empty. + cs := &controllerServer{backend: &staticBackend{info: info}} + resp, err := cs.ControllerPublishVolume(context.Background(), &csi.ControllerPublishVolumeRequest{ + VolumeId: "master-vol", + NodeId: "node-1", + }) + if err != nil { + t.Fatalf("ControllerPublishVolume: %v", err) + } + if _, ok := resp.PublishContext["nvmeAddr"]; ok { + t.Fatal("expected no nvmeAddr in PublishContext for master-backed volume") + } + if _, ok := resp.PublishContext["nqn"]; ok { + t.Fatal("expected no nqn in PublishContext for master-backed volume") + } +} + +// --- Issue 4: nvmeUtil == nil with NVMe metadata --- + +func TestNodeStageVolume_NVMeUtilNil_FallsToISCSI(t *testing.T) { + mi := newMockISCSIUtil() + mi.getDeviceResult = "/dev/sdb" + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: nil, // NVMe util not available + mountUtil: mm, + logger: log.New(os.Stderr, "[test-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + stagingPath := t.TempDir() + + // Both NVMe and iSCSI metadata present, but nvmeUtil is nil → must use iSCSI. + _, err := ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "nil-util-vol", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + "iscsiAddr": "10.0.0.5:3260", + "iqn": "iqn.2024.com.seaweedfs:nil-util-vol", + "nvmeAddr": "10.0.0.5:4420", + "nqn": "nqn.2024.com.seaweedfs:nil-util-vol", + }, + }) + if err != nil { + t.Fatalf("NodeStageVolume: %v", err) + } + + // iSCSI should be used. + if len(mi.calls) == 0 { + t.Fatal("expected iSCSI calls when nvmeUtil is nil") + } + + ns.stagedMu.Lock() + info := ns.staged["nil-util-vol"] + ns.stagedMu.Unlock() + if info.transport != transportISCSI { + t.Fatalf("expected transport=iscsi, got %q", info.transport) + } +} + +// --- Missing test: Expand with NVMe Rescan failure --- + +func TestNodeExpandVolume_NVMe_RescanFails(t *testing.T) { + mn := newMockNVMeUtil() + mn.rescanErr = errors.New("ns-rescan failed") + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[test-nvme] ", log.LstdFlags), + staged: map[string]*stagedVolumeInfo{ + "rescan-fail-vol": { + nqn: "nqn.2024.com.seaweedfs:rescan-fail-vol", + transport: transportNVMe, + fsType: "ext4", + stagingPath: "/staging/rescan-fail-vol", + }, + }, + } + + _, err := ns.NodeExpandVolume(context.Background(), &csi.NodeExpandVolumeRequest{ + VolumeId: "rescan-fail-vol", + CapacityRange: &csi.CapacityRange{RequiredBytes: 8 * 1024 * 1024}, + }) + if err == nil { + t.Fatal("expected error when NVMe rescan fails") + } + st, _ := status.FromError(err) + if st.Code() != codes.Internal { + t.Fatalf("expected Internal, got %v", st.Code()) + } + if !strings.Contains(err.Error(), "nvme rescan") { + t.Fatalf("expected 'nvme rescan' in error, got: %v", err) + } +} + +// --- Missing test: Unstage NVMe disconnect failure --- + +func TestNodeUnstageVolume_NVMe_DisconnectFails(t *testing.T) { + mn := newMockNVMeUtil() + mn.disconnectErr = errors.New("disconnect timeout") + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[test-nvme] ", log.LstdFlags), + staged: map[string]*stagedVolumeInfo{ + "disc-fail-vol": { + nqn: "nqn.2024.com.seaweedfs:disc-fail-vol", + nvmeAddr: "10.0.0.5:4420", + transport: transportNVMe, + }, + }, + } + + stagingPath := t.TempDir() + + _, err := ns.NodeUnstageVolume(context.Background(), &csi.NodeUnstageVolumeRequest{ + VolumeId: "disc-fail-vol", + StagingTargetPath: stagingPath, + }) + // Should return error (disconnect failure is surfaced). + if err == nil { + t.Fatal("expected error when NVMe disconnect fails") + } + if !strings.Contains(err.Error(), "disconnect timeout") { + t.Fatalf("expected disconnect error, got: %v", err) + } + + // Volume should remain in staged map for retry. + ns.stagedMu.Lock() + _, stillStaged := ns.staged["disc-fail-vol"] + ns.stagedMu.Unlock() + if !stillStaged { + t.Fatal("volume should remain in staged map after disconnect failure") + } +} + +// --- Missing test: Disconnect idempotency (already disconnected) --- + +func TestNodeUnstageVolume_NVMe_AlreadyDisconnected(t *testing.T) { + mn := newMockNVMeUtil() + // No disconnect error — idempotent success even though not connected. + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[test-nvme] ", log.LstdFlags), + staged: map[string]*stagedVolumeInfo{ + "already-disc-vol": { + nqn: "nqn.2024.com.seaweedfs:already-disc-vol", + nvmeAddr: "10.0.0.5:4420", + transport: transportNVMe, + }, + }, + } + + stagingPath := t.TempDir() + + _, err := ns.NodeUnstageVolume(context.Background(), &csi.NodeUnstageVolumeRequest{ + VolumeId: "already-disc-vol", + StagingTargetPath: stagingPath, + }) + if err != nil { + t.Fatalf("NodeUnstageVolume: %v", err) + } + + // Disconnect should be called (idempotent) and succeed. + foundDisconnect := false + for _, c := range mn.calls { + if strings.HasPrefix(c, "disconnect:") { + foundDisconnect = true + } + } + if !foundDisconnect { + t.Fatalf("expected disconnect call, got: %v", mn.calls) + } +} + +// --- Missing test: writeTransportFile error handling --- + +func TestWriteTransportFile_Error(t *testing.T) { + // Write to a non-existent directory → should return error. + err := writeTransportFile("/nonexistent/path/that/does/not/exist", transportNVMe) + if err == nil { + t.Fatal("expected error writing to non-existent directory") + } + if !strings.Contains(err.Error(), "write transport file") { + t.Fatalf("expected wrapped error, got: %v", err) + } +} + +// staticBackend is a minimal VolumeBackend for testing controller behavior. +type staticBackend struct { + info *VolumeInfo +} + +func (b *staticBackend) CreateVolume(_ context.Context, name string, sizeBytes uint64) (*VolumeInfo, error) { + return b.info, nil +} +func (b *staticBackend) DeleteVolume(_ context.Context, name string) error { return nil } +func (b *staticBackend) LookupVolume(_ context.Context, name string) (*VolumeInfo, error) { + return b.info, nil +} +func (b *staticBackend) CreateSnapshot(_ context.Context, volumeID string, snapID uint32) (*SnapshotInfo, error) { + return nil, nil +} +func (b *staticBackend) DeleteSnapshot(_ context.Context, volumeID string, snapID uint32) error { + return nil +} +func (b *staticBackend) ListSnapshots(_ context.Context, volumeID string) ([]*SnapshotInfo, error) { + return nil, nil +} +func (b *staticBackend) ExpandVolume(_ context.Context, volumeID string, newSizeBytes uint64) (uint64, error) { + return 0, nil +} diff --git a/weed/storage/blockvol/csi/nvme_util.go b/weed/storage/blockvol/csi/nvme_util.go new file mode 100644 index 000000000..a6ad001bd --- /dev/null +++ b/weed/storage/blockvol/csi/nvme_util.go @@ -0,0 +1,247 @@ +package csi + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net" + "os" + "os/exec" + "strings" + "time" +) + +// NVMeUtil provides NVMe/TCP initiator operations. +type NVMeUtil interface { + Connect(ctx context.Context, nqn, addr string) error + Disconnect(ctx context.Context, nqn string) error + IsConnected(ctx context.Context, nqn string) (bool, error) + GetDeviceByNQN(ctx context.Context, nqn string) (string, error) + GetControllerByNQN(ctx context.Context, nqn string) (string, error) + Rescan(ctx context.Context, nqn string) error + IsNVMeTCPAvailable() bool +} + +// realNVMeUtil uses nvme-cli commands. +type realNVMeUtil struct{} + +func (r *realNVMeUtil) Connect(ctx context.Context, nqn, addr string) error { + host, port, err := net.SplitHostPort(addr) + if err != nil { + return fmt.Errorf("nvme connect: invalid addr %q: %w", addr, err) + } + cmd := exec.CommandContext(ctx, "nvme", "connect", "-t", "tcp", "-n", nqn, "-a", host, "-s", port) + out, err := cmd.CombinedOutput() + if err != nil { + // Treat "already connected" as success (idempotent). + if strings.Contains(string(out), "already connected") { + return nil + } + return fmt.Errorf("nvme connect: %s: %w", string(out), err) + } + return nil +} + +func (r *realNVMeUtil) Disconnect(ctx context.Context, nqn string) error { + cmd := exec.CommandContext(ctx, "nvme", "disconnect", "-n", nqn) + out, err := cmd.CombinedOutput() + if err != nil { + // Treat "not connected" / "no subsystem" as success (idempotent). + outStr := string(out) + if strings.Contains(outStr, "not connected") || strings.Contains(outStr, "No subsystemtype") || strings.Contains(outStr, "Invalid argument") { + return nil + } + return fmt.Errorf("nvme disconnect: %s: %w", outStr, err) + } + return nil +} + +func (r *realNVMeUtil) IsConnected(ctx context.Context, nqn string) (bool, error) { + _, _, err := r.findSubsys(ctx, nqn) + if err != nil { + if errors.Is(err, errNQNNotFound) { + return false, nil // NQN not present = not connected + } + return false, err // command/parse failure — propagate + } + return true, nil +} + +// errNQNNotFound is returned by findSubsys when the NQN is not in the subsystem list. +// Callers use errors.Is to distinguish "not found" from command/parse errors. +var errNQNNotFound = errors.New("nvme: NQN not found") + +func (r *realNVMeUtil) GetDeviceByNQN(ctx context.Context, nqn string) (string, error) { + // Poll for device to appear (NVMe connect + device enumeration is async). + deadline := time.After(10 * time.Second) + ticker := time.NewTicker(200 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return "", ctx.Err() + case <-deadline: + return "", fmt.Errorf("timeout waiting for NVMe device for NQN %s", nqn) + case <-ticker.C: + _, dev, err := r.findSubsys(ctx, nqn) + if err != nil { + continue + } + if dev != "" { + return dev, nil + } + } + } +} + +func (r *realNVMeUtil) GetControllerByNQN(ctx context.Context, nqn string) (string, error) { + ctrl, _, err := r.findSubsys(ctx, nqn) + if err != nil { + return "", err + } + if ctrl == "" { + return "", fmt.Errorf("no controller found for NQN %s", nqn) + } + return ctrl, nil +} + +func (r *realNVMeUtil) Rescan(ctx context.Context, nqn string) error { + ctrl, err := r.GetControllerByNQN(ctx, nqn) + if err != nil { + return fmt.Errorf("nvme rescan: find controller: %w", err) + } + cmd := exec.CommandContext(ctx, "nvme", "ns-rescan", ctrl) + out, errCmd := cmd.CombinedOutput() + if errCmd != nil { + return fmt.Errorf("nvme ns-rescan %s: %s: %w", ctrl, string(out), errCmd) + } + return nil +} + +// IsNVMeTCPAvailable checks if the nvme_tcp kernel module is loaded (read-only). +func (r *realNVMeUtil) IsNVMeTCPAvailable() bool { + _, err := os.Stat("/sys/module/nvme_tcp") + return err == nil +} + +// nvmeListSubsysOutput represents the JSON output from `nvme list-subsys -o json`. +type nvmeListSubsysOutput struct { + Subsystems []nvmeSubsys `json:"Subsystems"` +} + +type nvmeSubsys struct { + NQN string `json:"NQN"` + Paths []nvmePath `json:"Paths"` + // Some nvme-cli versions use "Namespaces" instead. +} + +type nvmePath struct { + Name string `json:"Name"` // controller name, e.g. "nvme0" + Transport string `json:"Transport"` + State string `json:"State"` +} + +// findSubsys parses `nvme list-subsys -o json` to find controller and namespace device +// for a given NQN. Returns (controller path, namespace device path, error). +// Returns errNQNNotFound (sentinel) when the NQN is absent from the subsystem list. +// Returns a non-sentinel error for command execution or JSON parse failures. +func (r *realNVMeUtil) findSubsys(ctx context.Context, nqn string) (string, string, error) { + cmd := exec.CommandContext(ctx, "nvme", "list-subsys", "-o", "json") + out, err := cmd.CombinedOutput() + if err != nil { + return "", "", fmt.Errorf("nvme list-subsys: %s: %w", string(out), err) + } + + var parsed nvmeListSubsysOutput + if err := json.Unmarshal(out, &parsed); err != nil { + return "", "", fmt.Errorf("nvme list-subsys: parse json: %w", err) + } + + for _, ss := range parsed.Subsystems { + if ss.NQN != nqn { + continue + } + // Prefer a live TCP path. Fall back to any path with a name. + var fallbackCtrl string + for _, p := range ss.Paths { + if p.Name == "" { + continue + } + ctrl := "/dev/" + p.Name + dev := ctrl + "n1" + // Prefer Transport=tcp + State=live. + if strings.EqualFold(p.Transport, "tcp") && strings.EqualFold(p.State, "live") { + return ctrl, dev, nil + } + if fallbackCtrl == "" { + fallbackCtrl = ctrl + } + } + if fallbackCtrl != "" { + return fallbackCtrl, fallbackCtrl + "n1", nil + } + return "", "", fmt.Errorf("NQN %s found but no controller paths", nqn) + } + return "", "", errNQNNotFound +} + +// mockNVMeUtil is a test double for NVMeUtil. +type mockNVMeUtil struct { + connectErr error + disconnectErr error + getDeviceResult string + getDeviceErr error + getControllerResult string + getControllerErr error + rescanErr error + nvmeTCPAvailable bool + connected map[string]bool + calls []string +} + +func newMockNVMeUtil() *mockNVMeUtil { + return &mockNVMeUtil{connected: make(map[string]bool)} +} + +func (m *mockNVMeUtil) Connect(_ context.Context, nqn, addr string) error { + m.calls = append(m.calls, "connect:"+nqn+":"+addr) + if m.connectErr != nil { + return m.connectErr + } + m.connected[nqn] = true + return nil +} + +func (m *mockNVMeUtil) Disconnect(_ context.Context, nqn string) error { + m.calls = append(m.calls, "disconnect:"+nqn) + if m.disconnectErr != nil { + return m.disconnectErr + } + delete(m.connected, nqn) + return nil +} + +func (m *mockNVMeUtil) IsConnected(_ context.Context, nqn string) (bool, error) { + return m.connected[nqn], nil +} + +func (m *mockNVMeUtil) GetDeviceByNQN(_ context.Context, nqn string) (string, error) { + m.calls = append(m.calls, "getdevice:"+nqn) + return m.getDeviceResult, m.getDeviceErr +} + +func (m *mockNVMeUtil) GetControllerByNQN(_ context.Context, nqn string) (string, error) { + m.calls = append(m.calls, "getcontroller:"+nqn) + return m.getControllerResult, m.getControllerErr +} + +func (m *mockNVMeUtil) Rescan(_ context.Context, nqn string) error { + m.calls = append(m.calls, "rescan:"+nqn) + return m.rescanErr +} + +func (m *mockNVMeUtil) IsNVMeTCPAvailable() bool { + return m.nvmeTCPAvailable +} diff --git a/weed/storage/blockvol/csi/qa_cp102_nvme_node_test.go b/weed/storage/blockvol/csi/qa_cp102_nvme_node_test.go new file mode 100644 index 000000000..73926a8e4 --- /dev/null +++ b/weed/storage/blockvol/csi/qa_cp102_nvme_node_test.go @@ -0,0 +1,1088 @@ +package csi + +import ( + "context" + "errors" + "log" + "os" + "path/filepath" + "strings" + "sync" + "testing" + + "github.com/container-storage-interface/spec/lib/go/csi" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// ============================================================ +// QA-NVME-CP102: Adversarial tests for CSI NVMe transport path +// ============================================================ + +// --- QA-NVME-1: Transport file corruption --- + +// readTransportFile should return "" for garbage content, preventing +// the node from selecting a wrong transport after restart. +func TestQA_NVMe_TransportFile_GarbageContent(t *testing.T) { + mn := newMockNVMeUtil() + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + nqnPrefix: "nqn.2024-01.com.seaweedfs:vol.", + iqnPrefix: "iqn.2024.com.seaweedfs", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), // empty (restart) + } + + stagingPath := t.TempDir() + + // Write garbage into .transport file. + os.WriteFile(filepath.Join(stagingPath, transportFile), []byte("rdma-nonsense"), 0600) + + // Unstage should fall back to iSCSI (default) since "rdma-nonsense" is invalid. + _, err := ns.NodeUnstageVolume(context.Background(), &csi.NodeUnstageVolumeRequest{ + VolumeId: "garbage-transport-vol", + StagingTargetPath: stagingPath, + }) + if err != nil { + t.Fatalf("NodeUnstageVolume: %v", err) + } + + // iSCSI logout should be called (default fallback). + foundLogout := false + for _, c := range mi.calls { + if strings.HasPrefix(c, "logout:") { + foundLogout = true + } + } + if !foundLogout { + t.Fatalf("expected iSCSI logout (default fallback for garbage transport), got: mi=%v mn=%v", mi.calls, mn.calls) + } + + // NVMe disconnect should NOT be called. + for _, c := range mn.calls { + if strings.HasPrefix(c, "disconnect:") { + t.Fatalf("NVMe disconnect should not be called for garbage transport file, got: %v", mn.calls) + } + } +} + +// Empty .transport file should also default to iSCSI. +func TestQA_NVMe_TransportFile_Empty(t *testing.T) { + stagingPath := t.TempDir() + os.WriteFile(filepath.Join(stagingPath, transportFile), []byte(""), 0600) + result := readTransportFile(stagingPath) + if result != "" { + t.Fatalf("expected empty string for empty .transport, got %q", result) + } +} + +// .transport file with extra whitespace should not match. +func TestQA_NVMe_TransportFile_Whitespace(t *testing.T) { + stagingPath := t.TempDir() + os.WriteFile(filepath.Join(stagingPath, transportFile), []byte("nvme\n"), 0600) + result := readTransportFile(stagingPath) + if result != "" { + t.Fatalf("expected empty string for 'nvme\\n', got %q", result) + } +} + +// --- QA-NVME-2: IsConnected error during stage --- + +// If IsConnected returns an error (not false), stage should fail — don't blindly connect. +func TestQA_NVMe_Stage_IsConnectedError(t *testing.T) { + mn := newMockNVMeUtil() + mn.nvmeTCPAvailable = true + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + // Override IsConnected to return error. + errCheck := errors.New("nvme list-subsys: permission denied") + brokenMn := &isConnectedErrorNVMe{mockNVMeUtil: mn, err: errCheck} + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: brokenMn, + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + stagingPath := t.TempDir() + + _, err := ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "check-error-vol", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + "nvmeAddr": "10.0.0.5:4420", + "nqn": "nqn.2024.com.seaweedfs:check-error-vol", + }, + }) + if err == nil { + t.Fatal("expected error when IsConnected fails") + } + st, _ := status.FromError(err) + if st.Code() != codes.Internal { + t.Fatalf("expected Internal, got %v", st.Code()) + } + if !strings.Contains(err.Error(), "permission denied") { + t.Fatalf("expected permission denied in error, got: %v", err) + } + + // Should NOT have attempted Connect. + for _, c := range mn.calls { + if strings.HasPrefix(c, "connect:") { + t.Fatalf("should not attempt connect after IsConnected error, got: %v", mn.calls) + } + } +} + +// --- QA-NVME-3: Partial NVMe metadata --- + +// NVMe addr present but NQN missing → should fall through to iSCSI. +func TestQA_NVMe_Stage_PartialMetadata_MissingNQN(t *testing.T) { + mn := newMockNVMeUtil() + mn.nvmeTCPAvailable = true + mi := newMockISCSIUtil() + mi.getDeviceResult = "/dev/sdb" + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + stagingPath := t.TempDir() + + _, err := ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "partial-nvme-vol", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + "iscsiAddr": "10.0.0.5:3260", + "iqn": "iqn.2024.com.seaweedfs:partial-nvme-vol", + "nvmeAddr": "10.0.0.5:4420", + // NQN intentionally missing + }, + }) + if err != nil { + t.Fatalf("NodeStageVolume: %v", err) + } + + // Should have used iSCSI (NVMe metadata incomplete). + if len(mi.calls) == 0 { + t.Fatal("expected iSCSI calls") + } + if len(mn.calls) > 0 { + t.Fatalf("expected no NVMe calls with missing NQN, got: %v", mn.calls) + } +} + +// NQN present but addr missing → should fall through to iSCSI. +func TestQA_NVMe_Stage_PartialMetadata_MissingAddr(t *testing.T) { + mn := newMockNVMeUtil() + mn.nvmeTCPAvailable = true + mi := newMockISCSIUtil() + mi.getDeviceResult = "/dev/sdb" + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + stagingPath := t.TempDir() + + _, err := ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "partial-nvme-vol2", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + "iscsiAddr": "10.0.0.5:3260", + "iqn": "iqn.2024.com.seaweedfs:partial-nvme-vol2", + // nvmeAddr intentionally missing + "nqn": "nqn.2024.com.seaweedfs:partial-nvme-vol2", + }, + }) + if err != nil { + t.Fatalf("NodeStageVolume: %v", err) + } + + // Should have used iSCSI. + if len(mi.calls) == 0 { + t.Fatal("expected iSCSI calls") + } + if len(mn.calls) > 0 { + t.Fatalf("expected no NVMe calls with missing addr, got: %v", mn.calls) + } +} + +// --- QA-NVME-4: NVMe-only (no iSCSI metadata) --- + +// Volume with ONLY NVMe metadata, no iSCSI. Connect failure should +// return error (no fallback possible). +func TestQA_NVMe_Stage_NVMeOnlyNoISCSI_ConnectFails(t *testing.T) { + mn := newMockNVMeUtil() + mn.nvmeTCPAvailable = true + mn.connectErr = errors.New("connection refused") + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + stagingPath := t.TempDir() + + _, err := ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "nvme-only-fail-vol", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + // NO iSCSI metadata at all + "nvmeAddr": "10.0.0.5:4420", + "nqn": "nqn.2024.com.seaweedfs:nvme-only-fail-vol", + }, + }) + if err == nil { + t.Fatal("expected error when NVMe-only connect fails") + } + // Must not have tried iSCSI. + if len(mi.calls) > 0 { + t.Fatalf("should not attempt iSCSI when no iSCSI metadata, got: %v", mi.calls) + } +} + +// --- QA-NVME-5: No transport at all --- + +// No iSCSI, no NVMe, no local mgr → FailedPrecondition. +func TestQA_NVMe_Stage_NoTransportAtAll(t *testing.T) { + mn := newMockNVMeUtil() + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + stagingPath := t.TempDir() + + _, err := ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "no-transport-vol", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{}, // empty + }) + if err == nil { + t.Fatal("expected error with no transport") + } + st, _ := status.FromError(err) + if st.Code() != codes.FailedPrecondition { + t.Fatalf("expected FailedPrecondition, got %v", st.Code()) + } +} + +// --- QA-NVME-6: Cascading failures during unstage --- + +// Both unmount AND disconnect fail → first error returned, volume stays staged. +func TestQA_NVMe_Unstage_CascadingFailures(t *testing.T) { + mn := newMockNVMeUtil() + mn.disconnectErr = errors.New("disconnect failed") + mi := newMockISCSIUtil() + mm := newMockMountUtil() + mm.unmountErr = errors.New("device busy") + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: map[string]*stagedVolumeInfo{ + "cascade-vol": { + nqn: "nqn.2024.com.seaweedfs:cascade-vol", + nvmeAddr: "10.0.0.5:4420", + transport: transportNVMe, + }, + }, + } + + stagingPath := t.TempDir() + + _, err := ns.NodeUnstageVolume(context.Background(), &csi.NodeUnstageVolumeRequest{ + VolumeId: "cascade-vol", + StagingTargetPath: stagingPath, + }) + if err == nil { + t.Fatal("expected error with cascading failures") + } + // First error should be the unmount error (it's tried first). + if !strings.Contains(err.Error(), "device busy") { + t.Fatalf("expected unmount error first, got: %v", err) + } + + // Volume should remain staged for retry. + ns.stagedMu.Lock() + _, stillStaged := ns.staged["cascade-vol"] + ns.stagedMu.Unlock() + if !stillStaged { + t.Fatal("volume should remain in staged map after cascading failure") + } + + // Disconnect should still be attempted even after unmount failure. + foundDisconnect := false + for _, c := range mn.calls { + if strings.HasPrefix(c, "disconnect:") { + foundDisconnect = true + } + } + if !foundDisconnect { + t.Fatal("disconnect should still be attempted even after unmount failure") + } +} + +// --- QA-NVME-7: Expand with nvmeUtil nil but transport=NVMe --- + +// This can happen if the CSI plugin restarts without NVMe support but +// a volume was previously staged via NVMe. +func TestQA_NVMe_Expand_NVMeUtilNil(t *testing.T) { + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: nil, // NVMe support gone after restart + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: map[string]*stagedVolumeInfo{ + "orphan-nvme-vol": { + nqn: "nqn.2024.com.seaweedfs:orphan-nvme-vol", + transport: transportNVMe, + fsType: "ext4", + stagingPath: "/staging/orphan", + }, + }, + } + + _, err := ns.NodeExpandVolume(context.Background(), &csi.NodeExpandVolumeRequest{ + VolumeId: "orphan-nvme-vol", + CapacityRange: &csi.CapacityRange{RequiredBytes: 16 * 1024 * 1024}, + }) + if err == nil { + t.Fatal("expected error when nvmeUtil is nil for NVMe transport") + } + st, _ := status.FromError(err) + if st.Code() != codes.Internal { + t.Fatalf("expected Internal, got %v", st.Code()) + } + if !strings.Contains(err.Error(), "nvme util not available") { + t.Fatalf("expected 'nvme util not available', got: %v", err) + } +} + +// --- QA-NVME-8: Expand on unstaged volume --- + +func TestQA_NVMe_Expand_NotStaged(t *testing.T) { + mn := newMockNVMeUtil() + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + _, err := ns.NodeExpandVolume(context.Background(), &csi.NodeExpandVolumeRequest{ + VolumeId: "nonexistent-vol", + CapacityRange: &csi.CapacityRange{RequiredBytes: 8 * 1024 * 1024}, + }) + if err == nil { + t.Fatal("expected error expanding unstaged volume") + } + st, _ := status.FromError(err) + if st.Code() != codes.FailedPrecondition { + t.Fatalf("expected FailedPrecondition, got %v", st.Code()) + } +} + +// --- QA-NVME-9: Concurrent stage NVMe + unstage --- + +// Race between staging via NVMe and unstaging the same volume. +func TestQA_NVMe_ConcurrentStageUnstage(t *testing.T) { + mn := newMockNVMeUtil() + mn.nvmeTCPAvailable = true + mn.getDeviceResult = "/dev/nvme0n1" + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + stagingPath := t.TempDir() + ctx := context.Background() + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + ns.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{ + VolumeId: "race-vol", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + "nvmeAddr": "10.0.0.5:4420", + "nqn": "nqn.2024.com.seaweedfs:race-vol", + }, + }) + }() + + go func() { + defer wg.Done() + ns.NodeUnstageVolume(ctx, &csi.NodeUnstageVolumeRequest{ + VolumeId: "race-vol", + StagingTargetPath: stagingPath, + }) + }() + + wg.Wait() + // No panic is the main assertion. The staged map should be in a consistent state. + ns.stagedMu.Lock() + // We don't assert specific state because the race outcome is non-deterministic. + ns.stagedMu.Unlock() +} + +// --- QA-NVME-10: Rapid stage/unstage cycles --- + +func TestQA_NVMe_RapidStageUnstageCycles(t *testing.T) { + mn := newMockNVMeUtil() + mn.nvmeTCPAvailable = true + mn.getDeviceResult = "/dev/nvme0n1" + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + ctx := context.Background() + + for i := 0; i < 20; i++ { + stagingPath := t.TempDir() + _, err := ns.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{ + VolumeId: "rapid-vol", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + "nvmeAddr": "10.0.0.5:4420", + "nqn": "nqn.2024.com.seaweedfs:rapid-vol", + }, + }) + if err != nil { + t.Fatalf("stage cycle %d: %v", i, err) + } + _, err = ns.NodeUnstageVolume(ctx, &csi.NodeUnstageVolumeRequest{ + VolumeId: "rapid-vol", + StagingTargetPath: stagingPath, + }) + if err != nil { + t.Fatalf("unstage cycle %d: %v", i, err) + } + } + + // Verify staged map is clean after all cycles. + ns.stagedMu.Lock() + if _, ok := ns.staged["rapid-vol"]; ok { + t.Fatal("rapid-vol should not be in staged map after all cycles complete") + } + ns.stagedMu.Unlock() +} + +// --- QA-NVME-11: VolumeContext vs PublishContext precedence --- + +// PublishContext should take precedence over VolumeContext for NVMe metadata. +func TestQA_NVMe_ContextPrecedence(t *testing.T) { + mn := newMockNVMeUtil() + mn.nvmeTCPAvailable = true + mn.getDeviceResult = "/dev/nvme0n1" + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + stagingPath := t.TempDir() + + _, err := ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "precedence-vol", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + "nvmeAddr": "10.0.0.1:4420", + "nqn": "nqn.publish.context:precedence-vol", + }, + VolumeContext: map[string]string{ + "nvmeAddr": "10.0.0.2:4420", + "nqn": "nqn.volume.context:precedence-vol", + }, + }) + if err != nil { + t.Fatalf("NodeStageVolume: %v", err) + } + + // Verify PublishContext NQN was used (not VolumeContext). + foundCorrectConnect := false + for _, c := range mn.calls { + if strings.Contains(c, "nqn.publish.context") && strings.Contains(c, "10.0.0.1:4420") { + foundCorrectConnect = true + } + if strings.Contains(c, "nqn.volume.context") { + t.Fatalf("VolumeContext should not be used when PublishContext has NVMe data, got: %v", mn.calls) + } + } + if !foundCorrectConnect { + t.Fatalf("expected connect with PublishContext NQN, got: %v", mn.calls) + } +} + +// VolumeContext should be used when PublishContext has no NVMe data. +func TestQA_NVMe_VolumeContextFallback(t *testing.T) { + mn := newMockNVMeUtil() + mn.nvmeTCPAvailable = true + mn.getDeviceResult = "/dev/nvme0n1" + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + stagingPath := t.TempDir() + + _, err := ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "volctx-vol", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + // No NVMe keys in PublishContext + "iscsiAddr": "10.0.0.5:3260", + "iqn": "iqn.2024.com.seaweedfs:volctx-vol", + }, + VolumeContext: map[string]string{ + "nvmeAddr": "10.0.0.3:4420", + "nqn": "nqn.volume.context:volctx-vol", + }, + }) + if err != nil { + t.Fatalf("NodeStageVolume: %v", err) + } + + // Verify VolumeContext NVMe was used. + foundVolCtxConnect := false + for _, c := range mn.calls { + if strings.Contains(c, "nqn.volume.context") { + foundVolCtxConnect = true + } + } + if !foundVolCtxConnect { + t.Fatalf("expected connect with VolumeContext NQN, got: %v", mn.calls) + } +} + +// --- QA-NVME-12: Restart probe with NVMe probe failure --- + +// Unstage after restart: no .transport file, probe returns error → default to iSCSI. +func TestQA_NVMe_Unstage_ProbeFails_DefaultsISCSI(t *testing.T) { + mn := &isConnectedErrorNVMe{ + mockNVMeUtil: newMockNVMeUtil(), + err: errors.New("probe failed"), + } + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + nqnPrefix: "nqn.2024-01.com.seaweedfs:vol.", + iqnPrefix: "iqn.2024.com.seaweedfs", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), // empty (restart) + } + + stagingPath := t.TempDir() + // No .transport file. + + _, err := ns.NodeUnstageVolume(context.Background(), &csi.NodeUnstageVolumeRequest{ + VolumeId: "probe-fail-vol", + StagingTargetPath: stagingPath, + }) + if err != nil { + t.Fatalf("NodeUnstageVolume: %v", err) + } + + // Should default to iSCSI since probe errored (not connected). + foundLogout := false + for _, c := range mi.calls { + if strings.HasPrefix(c, "logout:") { + foundLogout = true + } + } + if !foundLogout { + t.Fatalf("expected iSCSI logout (default after probe error), got: mi=%v", mi.calls) + } +} + +// --- QA-NVME-13: Stage with cancelled context --- + +func TestQA_NVMe_Stage_CancelledContext(t *testing.T) { + mn := newMockNVMeUtil() + mn.nvmeTCPAvailable = true + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately. + + stagingPath := t.TempDir() + + // With mock, context cancellation won't propagate (mocks don't check ctx). + // But this verifies no panic or data corruption with a cancelled context. + ns.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{ + VolumeId: "cancel-vol", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + "nvmeAddr": "10.0.0.5:4420", + "nqn": "nqn.2024.com.seaweedfs:cancel-vol", + }, + }) + // No panic is the assertion. +} + +// --- QA-NVME-14: Unstage NVMe with nil nvmeUtil after restart --- + +// After restart, nvmeUtil may be nil but .transport says NVMe. +// Unstage should handle gracefully (skip disconnect, don't crash). +func TestQA_NVMe_Unstage_NVMeUtilNil_TransportFileNVMe(t *testing.T) { + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + nqnPrefix: "nqn.2024-01.com.seaweedfs:vol.", + iqnPrefix: "iqn.2024.com.seaweedfs", + iscsiUtil: mi, + nvmeUtil: nil, // NVMe not available after restart + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + stagingPath := t.TempDir() + os.WriteFile(filepath.Join(stagingPath, transportFile), []byte("nvme"), 0600) + + _, err := ns.NodeUnstageVolume(context.Background(), &csi.NodeUnstageVolumeRequest{ + VolumeId: "nil-util-unstage-vol", + StagingTargetPath: stagingPath, + }) + // Should succeed — skip disconnect since nvmeUtil is nil, guard checks nqn != "" && nvmeUtil != nil. + if err != nil { + t.Fatalf("expected success (graceful skip), got: %v", err) + } +} + +// --- QA-NVME-15: Concurrent stage of different volumes --- + +func TestQA_NVMe_ConcurrentDifferentVolumes(t *testing.T) { + mn := newMockNVMeUtil() + mn.nvmeTCPAvailable = true + mn.getDeviceResult = "/dev/nvme0n1" + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + ctx := context.Background() + var wg sync.WaitGroup + + for i := 0; i < 5; i++ { + wg.Add(1) + volID := strings.Replace("par-vol-X", "X", string(rune('a'+i)), 1) + stagingPath := t.TempDir() + go func(vid, sp string) { + defer wg.Done() + ns.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{ + VolumeId: vid, + StagingTargetPath: sp, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + "nvmeAddr": "10.0.0.5:4420", + "nqn": "nqn.2024.com.seaweedfs:" + vid, + }, + }) + }(volID, stagingPath) + } + wg.Wait() + + // All 5 volumes should be staged. No panic. + ns.stagedMu.Lock() + count := len(ns.staged) + ns.stagedMu.Unlock() + if count != 5 { + t.Fatalf("expected 5 staged volumes, got %d", count) + } +} + +// --- QA-NVME-16: Missing required fields --- + +func TestQA_NVMe_Stage_MissingVolumeID(t *testing.T) { + mn := newMockNVMeUtil() + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + _, err := ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "", + StagingTargetPath: "/some/path", + VolumeCapability: testVolCap(), + }) + if err == nil { + t.Fatal("expected error for missing volume ID") + } + st, _ := status.FromError(err) + if st.Code() != codes.InvalidArgument { + t.Fatalf("expected InvalidArgument, got %v", st.Code()) + } +} + +func TestQA_NVMe_Stage_MissingStagingPath(t *testing.T) { + mn := newMockNVMeUtil() + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + _, err := ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "some-vol", + StagingTargetPath: "", + VolumeCapability: testVolCap(), + }) + if err == nil { + t.Fatal("expected error for missing staging path") + } + st, _ := status.FromError(err) + if st.Code() != codes.InvalidArgument { + t.Fatalf("expected InvalidArgument, got %v", st.Code()) + } +} + +func TestQA_NVMe_Stage_MissingCapability(t *testing.T) { + mn := newMockNVMeUtil() + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + _, err := ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "some-vol", + StagingTargetPath: "/some/path", + VolumeCapability: nil, + }) + if err == nil { + t.Fatal("expected error for missing capability") + } + st, _ := status.FromError(err) + if st.Code() != codes.InvalidArgument { + t.Fatalf("expected InvalidArgument, got %v", st.Code()) + } +} + +// --- QA-NVME-17: Unstage idempotency (not in map, no .transport) --- + +// Unstage a volume that was never staged and has no .transport file. +// Should succeed (idempotent). +func TestQA_NVMe_Unstage_NeverStaged(t *testing.T) { + mn := newMockNVMeUtil() + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + nqnPrefix: "nqn.2024-01.com.seaweedfs:vol.", + iqnPrefix: "iqn.2024.com.seaweedfs", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + stagingPath := t.TempDir() + + _, err := ns.NodeUnstageVolume(context.Background(), &csi.NodeUnstageVolumeRequest{ + VolumeId: "never-staged-vol", + StagingTargetPath: stagingPath, + }) + if err != nil { + t.Fatalf("expected idempotent success, got: %v", err) + } +} + +// --- QA-NVME-18: Transport stickiness across iSCSI→NVMe upgrade --- + +// Volume first staged via iSCSI, then re-staged (after controller re-publishes with NVMe). +// Transport should stay iSCSI until explicit unstage+re-stage. +func TestQA_NVMe_TransportSticky_ISCSIToNVMe(t *testing.T) { + mn := newMockNVMeUtil() + mn.nvmeTCPAvailable = true + mn.getDeviceResult = "/dev/nvme0n1" + mi := newMockISCSIUtil() + mi.getDeviceResult = "/dev/sdb" + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + stagingPath := t.TempDir() + + // First stage: iSCSI only. + _, err := ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "upgrade-vol", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + "iscsiAddr": "10.0.0.5:3260", + "iqn": "iqn.2024.com.seaweedfs:upgrade-vol", + }, + }) + if err != nil { + t.Fatalf("first stage (iSCSI): %v", err) + } + + ns.stagedMu.Lock() + info := ns.staged["upgrade-vol"] + ns.stagedMu.Unlock() + if info.transport != transportISCSI { + t.Fatalf("first stage: expected iscsi, got %q", info.transport) + } + + // Re-stage attempt with NVMe metadata (already mounted → idempotent). + mm.isMountedTargets[stagingPath] = true + _, err = ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "upgrade-vol", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + "iscsiAddr": "10.0.0.5:3260", + "iqn": "iqn.2024.com.seaweedfs:upgrade-vol", + "nvmeAddr": "10.0.0.5:4420", + "nqn": "nqn.2024.com.seaweedfs:upgrade-vol", + }, + }) + if err != nil { + t.Fatalf("re-stage: %v", err) + } + + // Transport must remain iSCSI — no implicit upgrade. + ns.stagedMu.Lock() + info = ns.staged["upgrade-vol"] + ns.stagedMu.Unlock() + if info.transport != transportISCSI { + t.Fatalf("re-stage: expected iscsi (sticky), got %q", info.transport) + } +} + +// --- QA-NVME-19: GetDeviceByNQN returns empty device --- + +func TestQA_NVMe_Stage_GetDeviceReturnsEmpty(t *testing.T) { + mn := newMockNVMeUtil() + mn.nvmeTCPAvailable = true + mn.getDeviceResult = "" // empty device path + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: make(map[string]*stagedVolumeInfo), + } + + stagingPath := t.TempDir() + + _, err := ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ + VolumeId: "empty-dev-vol", + StagingTargetPath: stagingPath, + VolumeCapability: testVolCap(), + PublishContext: map[string]string{ + "nvmeAddr": "10.0.0.5:4420", + "nqn": "nqn.2024.com.seaweedfs:empty-dev-vol", + }, + }) + // FormatAndMount will get "" as device — should fail at mount. + // The important thing: no panic, and cleanup disconnect is called. + if err == nil { + // Some mock configs might silently pass with empty device, which is also fine + // as long as no crash occurs. + return + } + + // Verify cleanup: disconnect called. + foundDisconnect := false + for _, c := range mn.calls { + if strings.HasPrefix(c, "disconnect:") { + foundDisconnect = true + } + } + if !foundDisconnect { + t.Fatalf("expected disconnect in cleanup after empty device, got: %v", mn.calls) + } +} + +// --- QA-NVME-20: Expand with GetDeviceByNQN failure --- + +func TestQA_NVMe_Expand_GetDeviceFails(t *testing.T) { + mn := newMockNVMeUtil() + mn.getDeviceErr = errors.New("device vanished") + mi := newMockISCSIUtil() + mm := newMockMountUtil() + + ns := &nodeServer{ + nodeID: "test-node-1", + iscsiUtil: mi, + nvmeUtil: mn, + mountUtil: mm, + logger: log.New(os.Stderr, "[qa-nvme] ", log.LstdFlags), + staged: map[string]*stagedVolumeInfo{ + "dev-vanish-vol": { + nqn: "nqn.2024.com.seaweedfs:dev-vanish-vol", + transport: transportNVMe, + fsType: "ext4", + stagingPath: "/staging/vanish", + }, + }, + } + + _, err := ns.NodeExpandVolume(context.Background(), &csi.NodeExpandVolumeRequest{ + VolumeId: "dev-vanish-vol", + CapacityRange: &csi.CapacityRange{RequiredBytes: 16 * 1024 * 1024}, + }) + if err == nil { + t.Fatal("expected error when GetDeviceByNQN fails during expand") + } + if !strings.Contains(err.Error(), "device vanished") { + t.Fatalf("expected 'device vanished' in error, got: %v", err) + } +} + +// --- Helper types --- + +// isConnectedErrorNVMe wraps mockNVMeUtil but makes IsConnected return an error. +type isConnectedErrorNVMe struct { + *mockNVMeUtil + err error +} + +func (m *isConnectedErrorNVMe) IsConnected(_ context.Context, _ string) (bool, error) { + return false, m.err +} diff --git a/weed/storage/blockvol/csi/qa_cp62_test.go b/weed/storage/blockvol/csi/qa_cp62_test.go index fa5a640aa..121efb18d 100644 --- a/weed/storage/blockvol/csi/qa_cp62_test.go +++ b/weed/storage/blockvol/csi/qa_cp62_test.go @@ -132,9 +132,16 @@ func TestQA_Node_ConcurrentStageUnstage(t *testing.T) { var wg sync.WaitGroup var panicked atomic.Bool + // Pre-allocate temp dirs to avoid calling t.TempDir() inside goroutines + // (t.TempDir() panics if called after test cleanup). + stagingDirs := make([]string, 20) + for i := range stagingDirs { + stagingDirs[i] = t.TempDir() + } + for i := 0; i < 20; i++ { wg.Add(2) - go func() { + go func(dir string) { defer wg.Done() defer func() { if r := recover(); r != nil { @@ -144,10 +151,10 @@ func TestQA_Node_ConcurrentStageUnstage(t *testing.T) { }() ns.NodeStageVolume(context.Background(), &csi.NodeStageVolumeRequest{ VolumeId: "test-vol", - StagingTargetPath: t.TempDir(), + StagingTargetPath: dir, VolumeCapability: testVolCap(), }) - }() + }(stagingDirs[i]) go func() { defer wg.Done() defer func() { diff --git a/weed/storage/blockvol/csi/server.go b/weed/storage/blockvol/csi/server.go index 80676ea9a..7762085d6 100644 --- a/weed/storage/blockvol/csi/server.go +++ b/weed/storage/blockvol/csi/server.go @@ -19,6 +19,8 @@ type DriverConfig struct { DataDir string // volume data directory ISCSIAddr string // local iSCSI target listen address IQNPrefix string // IQN prefix for volumes + NVMeAddr string // local NVMe/TCP target listen address (empty = NVMe disabled) + NQNPrefix string // NQN prefix for NVMe subsystems NodeID string // node identifier Logger *log.Logger @@ -66,7 +68,8 @@ func NewCSIDriver(cfg DriverConfig) (*CSIDriver, error) { var mgr *VolumeManager needsLocalMgr := cfg.Mode == "all" && cfg.MasterAddr == "" || cfg.Mode == "node" if needsLocalMgr { - mgr = NewVolumeManager(cfg.DataDir, cfg.ISCSIAddr, cfg.IQNPrefix, cfg.Logger) + mgr = NewVolumeManager(cfg.DataDir, cfg.ISCSIAddr, cfg.IQNPrefix, cfg.Logger, + VolumeManagerOpts{NvmeAddr: cfg.NVMeAddr, NQNPrefix: cfg.NQNPrefix}) d.mgr = mgr } @@ -89,7 +92,9 @@ func NewCSIDriver(cfg DriverConfig) (*CSIDriver, error) { mgr: mgr, // may be nil in controller-only mode nodeID: cfg.NodeID, iqnPrefix: cfg.IQNPrefix, + nqnPrefix: cfg.NQNPrefix, iscsiUtil: &realISCSIUtil{}, + nvmeUtil: &realNVMeUtil{}, mountUtil: &realMountUtil{}, logger: cfg.Logger, staged: make(map[string]*stagedVolumeInfo), diff --git a/weed/storage/blockvol/csi/volume_backend.go b/weed/storage/blockvol/csi/volume_backend.go index 49d3c9f00..2a827be85 100644 --- a/weed/storage/blockvol/csi/volume_backend.go +++ b/weed/storage/blockvol/csi/volume_backend.go @@ -14,6 +14,8 @@ type VolumeInfo struct { VolumeID string ISCSIAddr string // iSCSI target address (ip:port) IQN string // iSCSI target IQN + NvmeAddr string // NVMe/TCP target address (ip:port), empty if NVMe disabled + NQN string // NVMe subsystem NQN, empty if NVMe disabled CapacityBytes uint64 } @@ -59,6 +61,8 @@ func (b *LocalVolumeBackend) CreateVolume(ctx context.Context, name string, size VolumeID: name, ISCSIAddr: b.mgr.ListenAddr(), IQN: b.mgr.VolumeIQN(name), + NvmeAddr: b.mgr.NvmeAddr(), + NQN: b.mgr.VolumeNQN(name), CapacityBytes: actualSize, }, nil } @@ -75,6 +79,8 @@ func (b *LocalVolumeBackend) LookupVolume(ctx context.Context, name string) (*Vo VolumeID: name, ISCSIAddr: b.mgr.ListenAddr(), IQN: b.mgr.VolumeIQN(name), + NvmeAddr: b.mgr.NvmeAddr(), + NQN: b.mgr.VolumeNQN(name), CapacityBytes: b.mgr.VolumeSizeBytes(name), }, nil } @@ -100,6 +106,11 @@ func (b *LocalVolumeBackend) ExpandVolume(ctx context.Context, volumeID string, } // MasterVolumeClient calls master gRPC for volume operations. +// NOTE: NvmeAddr/NQN fields in VolumeInfo are NOT populated by MasterVolumeClient +// because the master proto (CreateBlockVolumeResponse, LookupBlockVolumeResponse) +// does not yet have nvme_addr/nqn fields. This is deferred until proto is updated +// in a future CP. NVMe support via master-backend path is therefore iSCSI-only +// until that proto change lands. type MasterVolumeClient struct { masterAddr string dialOpt grpc.DialOption diff --git a/weed/storage/blockvol/csi/volume_manager.go b/weed/storage/blockvol/csi/volume_manager.go index e93b96f9a..172067667 100644 --- a/weed/storage/blockvol/csi/volume_manager.go +++ b/weed/storage/blockvol/csi/volume_manager.go @@ -25,6 +25,7 @@ type managedVolume struct { vol *blockvol.BlockVol path string // file path to .blk file iqn string // target IQN for this volume + nqn string // NVMe subsystem NQN for this volume sizeBytes uint64 } @@ -45,19 +46,27 @@ type VolumeManager struct { volumes map[string]*managedVolume target *iscsi.TargetServer iqnPrefix string + nqnPrefix string config iscsi.TargetConfig logger *log.Logger state managerState iscsiAddr string + nvmeAddr string +} + +// VolumeManagerOpts holds optional configuration for VolumeManager. +type VolumeManagerOpts struct { + NvmeAddr string // NVMe/TCP target address (ip:port), empty if NVMe disabled + NQNPrefix string // NQN prefix for NVMe subsystems } // NewVolumeManager creates a new VolumeManager. -func NewVolumeManager(dataDir, iscsiAddr, iqnPrefix string, logger *log.Logger) *VolumeManager { +func NewVolumeManager(dataDir, iscsiAddr, iqnPrefix string, logger *log.Logger, opts ...VolumeManagerOpts) *VolumeManager { if logger == nil { logger = log.Default() } config := iscsi.DefaultTargetConfig() - return &VolumeManager{ + vm := &VolumeManager{ dataDir: dataDir, volumes: make(map[string]*managedVolume), iqnPrefix: iqnPrefix, @@ -65,6 +74,11 @@ func NewVolumeManager(dataDir, iscsiAddr, iqnPrefix string, logger *log.Logger) logger: logger, iscsiAddr: iscsiAddr, } + if len(opts) > 0 { + vm.nvmeAddr = opts[0].NvmeAddr + vm.nqnPrefix = opts[0].NQNPrefix + } + return vm } // Start initializes and starts the shared TargetServer. @@ -175,6 +189,7 @@ func (m *VolumeManager) CreateVolume(name string, sizeBytes uint64) error { vol: vol, path: volPath, iqn: iqn, + nqn: m.volumeNQN(name), sizeBytes: info.VolumeSize, } m.logger.Printf("adopted existing volume %q: %s (%d bytes)", name, iqn, info.VolumeSize) @@ -198,6 +213,7 @@ func (m *VolumeManager) CreateVolume(name string, sizeBytes uint64) error { vol: vol, path: volPath, iqn: iqn, + nqn: m.volumeNQN(name), sizeBytes: sizeBytes, } @@ -267,6 +283,7 @@ func (m *VolumeManager) OpenVolume(name string) error { vol: vol, path: volPath, iqn: iqn, + nqn: m.volumeNQN(name), sizeBytes: info.VolumeSize, } @@ -325,6 +342,23 @@ func (m *VolumeManager) ListenAddr() string { return "" } +// NvmeAddr returns the NVMe/TCP target address, or empty if NVMe is disabled. +func (m *VolumeManager) NvmeAddr() string { + return m.nvmeAddr +} + +// VolumeNQN returns the NVMe NQN for a volume name. Returns empty if nqnPrefix is not set. +func (m *VolumeManager) VolumeNQN(name string) string { + if m.nqnPrefix == "" { + return "" + } + return m.volumeNQN(name) +} + +func (m *VolumeManager) volumeNQN(name string) string { + return blockvol.BuildNQN(m.nqnPrefix, name) +} + // WithVolume runs fn while holding the manager lock with a reference to the volume. func (m *VolumeManager) WithVolume(name string, fn func(*blockvol.BlockVol) error) error { m.mu.RLock() diff --git a/weed/storage/blockvol/naming.go b/weed/storage/blockvol/naming.go index 22d1d9041..d4df874d4 100644 --- a/weed/storage/blockvol/naming.go +++ b/weed/storage/blockvol/naming.go @@ -16,6 +16,14 @@ func SanitizeFilename(name string) string { return reInvalidFilename.ReplaceAllString(strings.ToLower(name), "-") } +// BuildNQN constructs an NVMe NQN from a prefix and volume name. +// The prefix must already include the separator (e.g. "nqn.2024-01.com.seaweedfs:vol."). +// This is the single source of truth for NQN construction — used by both +// the volume server (BlockService) and the CSI driver (VolumeManager/nodeServer). +func BuildNQN(prefix, name string) string { + return prefix + SanitizeIQN(name) +} + // SanitizeIQN normalizes a CSI volume ID for use in an IQN. // Lowercases, replaces invalid chars with '-', truncates to 64 chars. // When truncation is needed, a hash suffix is appended to preserve uniqueness.