From 15f4a97029a709ff12caac9a208c0d1d54febf42 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 18 Mar 2026 23:28:07 -0700 Subject: [PATCH] fix: improve raft leader election reliability and failover speed (#8692) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: clear raft vote state file on non-resume startup The seaweedfs/raft library v1.1.7 added a persistent `state` file for currentTerm and votedFor. When RaftResumeState=false (the default), the log, conf, and snapshot directories are cleared but this state file was not. On repeated restarts, different masters accumulate divergent terms, causing AppendEntries rejections and preventing leader election. Fixes #8690 * fix: recover TopologyId from snapshot before clearing raft state When RaftResumeState=false clears log/conf/snapshot, the TopologyId (used for license validation) was lost. Now extract it from the latest snapshot before cleanup and restore it on the topology. Both seaweedfs/raft and hashicorp/raft paths are handled, with a shared recoverTopologyIdFromState helper in raft_common.go. * fix: stagger multi-master bootstrap delay by peer index Previously all masters used a fixed 1500ms delay before the bootstrap check. Now the delay is proportional to the peer's sorted index with randomization (matching the hashicorp raft path), giving the designated bootstrap node (peer 0) a head start while later peers wait for gRPC servers to be ready. Also adds diagnostic logging showing why DoJoinCommand was or wasn't called, making leader election issues easier to diagnose from logs. * fix: skip unreachable masters during leader reconnection When a master leader goes down, non-leader masters still redirect clients to the stale leader address. The masterClient would follow these redirects, fail, and retry — wasting round-trips each cycle. Now tryAllMasters tracks which masters failed within a cycle and skips redirects pointing to them, reducing log spam and connection overhead during leader failover. * fix: take snapshot after TopologyId generation for recovery After generating a new TopologyId on the leader, immediately take a raft snapshot so the ID can be recovered from the snapshot on future restarts with RaftResumeState=false. Without this, short-lived clusters would lose the TopologyId on restart since no automatic snapshot had been taken yet. * test: add multi-master raft failover integration tests Integration test framework and 5 test scenarios for 3-node master clusters: - TestLeaderConsistencyAcrossNodes: all nodes agree on leader and TopologyId - TestLeaderDownAndRecoverQuickly: leader stops, new leader elected, old leader rejoins as follower - TestLeaderDownSlowRecover: leader gone for extended period, cluster continues with 2/3 quorum - TestTwoMastersDownAndRestart: quorum lost (2/3 down), recovered when both restart - TestAllMastersDownAndRestart: full cluster restart, leader elected, all nodes agree on TopologyId * fix: address PR review comments - peerIndex: return -1 (not 0) when self not found, add warning log - recoverTopologyIdFromSnapshot: defer dir.Close() - tests: check GetTopologyId errors instead of discarding them * fix: address review comments on failover tests - Assert no leader after quorum loss (was only logging) - Verify follower cs.Leader matches expected leader via ServerAddress.ToHttpAddress() comparison - Check GetTopologyId error in TestTwoMastersDownAndRestart --- test/multi_master/cluster.go | 437 +++++++++++++++++++++++++++++ test/multi_master/failover_test.go | 313 +++++++++++++++++++++ weed/command/master.go | 35 ++- weed/command/master_test.go | 6 +- weed/server/master_server.go | 17 ++ weed/server/raft_common.go | 18 ++ weed/server/raft_hashicorp.go | 25 ++ weed/server/raft_server.go | 53 ++++ weed/wdclient/masterclient.go | 15 +- 9 files changed, 908 insertions(+), 11 deletions(-) create mode 100644 test/multi_master/cluster.go create mode 100644 test/multi_master/failover_test.go diff --git a/test/multi_master/cluster.go b/test/multi_master/cluster.go new file mode 100644 index 000000000..ac84ddb9c --- /dev/null +++ b/test/multi_master/cluster.go @@ -0,0 +1,437 @@ +package multi_master + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "os" + "os/exec" + "path/filepath" + "runtime" + "strconv" + "strings" + "sync" + "testing" + "time" +) + +const ( + waitTimeout = 30 * time.Second + waitTick = 200 * time.Millisecond +) + +// masterNode represents a single master process in the cluster. +type masterNode struct { + port int + grpcPort int + dataDir string + cmd *exec.Cmd + logFile string + stopped bool +} + +// MasterCluster manages a 3-node master raft cluster for integration tests. +type MasterCluster struct { + t testing.TB + weedBinary string + baseDir string + logsDir string + keepLogs bool + + nodes [3]*masterNode + mu sync.Mutex + + // peers string shared by all nodes, e.g. "127.0.0.1:9333,127.0.0.1:9334,127.0.0.1:9335" + peersStr string +} + +// clusterStatus is the JSON returned by /cluster/status. +type clusterStatus struct { + IsLeader bool `json:"IsLeader"` + Leader string `json:"Leader"` + Peers []string `json:"Peers"` +} + +// StartMasterCluster boots a 3-node master raft cluster and waits for a leader. +func StartMasterCluster(t testing.TB) *MasterCluster { + t.Helper() + + weedBinary, err := findOrBuildWeedBinary() + if err != nil { + t.Fatalf("resolve weed binary: %v", err) + } + + keepLogs := os.Getenv("MULTI_MASTER_IT_KEEP_LOGS") == "1" + baseDir, err := os.MkdirTemp("", "seaweedfs_multi_master_it_") + if err != nil { + t.Fatalf("create temp dir: %v", err) + } + logsDir := filepath.Join(baseDir, "logs") + os.MkdirAll(logsDir, 0o755) + + // Allocate 3 port pairs (http, grpc) atomically to prevent reuse. + portPairs, err := allocateMultipleMasterPortPairs(3) + if err != nil { + t.Fatalf("allocate ports: %v", err) + } + var nodes [3]*masterNode + var peerParts []string + for i, pp := range portPairs { + dataDir := filepath.Join(baseDir, fmt.Sprintf("m%d", i)) + os.MkdirAll(dataDir, 0o755) + nodes[i] = &masterNode{ + port: pp[0], + grpcPort: pp[1], + dataDir: dataDir, + logFile: filepath.Join(logsDir, fmt.Sprintf("master%d.log", i)), + } + peerParts = append(peerParts, fmt.Sprintf("127.0.0.1:%d", pp[0])) + } + + mc := &MasterCluster{ + t: t, + weedBinary: weedBinary, + baseDir: baseDir, + logsDir: logsDir, + keepLogs: keepLogs, + nodes: nodes, + peersStr: strings.Join(peerParts, ","), + } + + for i := range 3 { + mc.StartNode(i) + } + + if err := mc.WaitForLeader(waitTimeout); err != nil { + mc.DumpLogs() + mc.StopAll() + t.Fatalf("cluster did not elect a leader: %v", err) + } + + // Wait for TopologyId to be generated and propagated. This is async + // after leader election, and we need it committed before tests can + // reliably stop/restart nodes. + if err := mc.WaitForTopologyId(waitTimeout); err != nil { + mc.DumpLogs() + mc.StopAll() + t.Fatalf("TopologyId not generated: %v", err) + } + + t.Cleanup(func() { + mc.StopAll() + }) + return mc +} + +// StartNode starts the master process at the given index (0–2). +func (mc *MasterCluster) StartNode(i int) { + mc.t.Helper() + mc.mu.Lock() + defer mc.mu.Unlock() + + n := mc.nodes[i] + if n.cmd != nil && !n.stopped { + return // already running + } + + logFile, err := os.OpenFile(n.logFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + mc.t.Fatalf("create log for node %d: %v", i, err) + } + + args := []string{ + "master", + "-ip=127.0.0.1", + "-port=" + strconv.Itoa(n.port), + "-port.grpc=" + strconv.Itoa(n.grpcPort), + "-mdir=" + n.dataDir, + "-peers=" + mc.peersStr, + "-electionTimeout=3s", + "-volumeSizeLimitMB=32", + "-defaultReplication=000", + } + + n.cmd = exec.Command(mc.weedBinary, args...) + n.cmd.Dir = mc.baseDir + n.cmd.Stdout = logFile + n.cmd.Stderr = logFile + n.stopped = false + if err := n.cmd.Start(); err != nil { + mc.t.Fatalf("start node %d: %v", i, err) + } +} + +// StopNode gracefully stops the master at the given index. +func (mc *MasterCluster) StopNode(i int) { + mc.mu.Lock() + defer mc.mu.Unlock() + mc.stopNodeLocked(i) +} + +func (mc *MasterCluster) stopNodeLocked(i int) { + n := mc.nodes[i] + if n.cmd == nil || n.stopped { + return + } + n.stopped = true + _ = n.cmd.Process.Signal(os.Interrupt) + done := make(chan error, 1) + go func() { done <- n.cmd.Wait() }() + select { + case <-time.After(10 * time.Second): + _ = n.cmd.Process.Kill() + <-done + case <-done: + } +} + +// StopAll stops all running master nodes. +func (mc *MasterCluster) StopAll() { + mc.mu.Lock() + defer mc.mu.Unlock() + for i := range 3 { + mc.stopNodeLocked(i) + } + if !mc.keepLogs && !mc.t.Failed() { + os.RemoveAll(mc.baseDir) + } else if mc.baseDir != "" { + mc.t.Logf("multi-master logs kept at %s", mc.baseDir) + } +} + +// NodeURL returns the HTTP URL for node i. +func (mc *MasterCluster) NodeURL(i int) string { + return fmt.Sprintf("http://127.0.0.1:%d", mc.nodes[i].port) +} + +// NodeAddress returns "127.0.0.1:port" for node i. +func (mc *MasterCluster) NodeAddress(i int) string { + return fmt.Sprintf("127.0.0.1:%d", mc.nodes[i].port) +} + +// NodeGRPCAddress returns "127.0.0.1:grpcPort" for node i. +func (mc *MasterCluster) NodeGRPCAddress(i int) string { + return fmt.Sprintf("127.0.0.1:%d", mc.nodes[i].grpcPort) +} + +// IsNodeRunning returns true if the node at index i has a live process. +func (mc *MasterCluster) IsNodeRunning(i int) bool { + mc.mu.Lock() + defer mc.mu.Unlock() + n := mc.nodes[i] + return n.cmd != nil && !n.stopped +} + +// GetClusterStatus fetches /cluster/status from node i. +func (mc *MasterCluster) GetClusterStatus(i int) (*clusterStatus, error) { + client := &http.Client{Timeout: 2 * time.Second} + resp, err := client.Get(mc.NodeURL(i) + "/cluster/status") + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + var cs clusterStatus + if err := json.Unmarshal(body, &cs); err != nil { + return nil, fmt.Errorf("parse cluster/status: %w (body: %s)", err, string(body)) + } + return &cs, nil +} + +// GetTopologyId fetches the TopologyId from /dir/status on node i. +func (mc *MasterCluster) GetTopologyId(i int) (string, error) { + client := &http.Client{Timeout: 2 * time.Second} + resp, err := client.Get(mc.NodeURL(i) + "/dir/status") + if err != nil { + return "", err + } + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + var raw map[string]any + if err := json.Unmarshal(body, &raw); err != nil { + return "", err + } + if id, ok := raw["TopologyId"].(string); ok { + return id, nil + } + return "", nil +} + +// FindLeader returns the index of the leader node and its address. +// Returns -1 if no leader is found. +func (mc *MasterCluster) FindLeader() (int, string) { + for i := range 3 { + if !mc.IsNodeRunning(i) { + continue + } + cs, err := mc.GetClusterStatus(i) + if err != nil { + continue + } + if cs.IsLeader { + return i, mc.NodeAddress(i) + } + } + return -1, "" +} + +// WaitForLeader polls until a leader is elected or timeout. +func (mc *MasterCluster) WaitForLeader(timeout time.Duration) error { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if idx, _ := mc.FindLeader(); idx >= 0 { + return nil + } + time.Sleep(waitTick) + } + return fmt.Errorf("no leader elected within %v", timeout) +} + +// WaitForNewLeader waits for a leader that is different from the given address. +func (mc *MasterCluster) WaitForNewLeader(oldLeaderAddr string, timeout time.Duration) (int, string, error) { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + idx, addr := mc.FindLeader() + if idx >= 0 && addr != oldLeaderAddr { + return idx, addr, nil + } + time.Sleep(waitTick) + } + return -1, "", fmt.Errorf("no new leader (different from %s) within %v", oldLeaderAddr, timeout) +} + +// WaitForTopologyId waits until the leader reports a non-empty TopologyId. +func (mc *MasterCluster) WaitForTopologyId(timeout time.Duration) error { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if idx, _ := mc.FindLeader(); idx >= 0 { + if id, err := mc.GetTopologyId(idx); err == nil && id != "" { + return nil + } + } + time.Sleep(waitTick) + } + return fmt.Errorf("TopologyId not available within %v", timeout) +} + +// WaitForNodeReady waits for node i to respond to HTTP. +func (mc *MasterCluster) WaitForNodeReady(i int, timeout time.Duration) error { + client := &http.Client{Timeout: 1 * time.Second} + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + resp, err := client.Get(mc.NodeURL(i) + "/cluster/status") + if err == nil { + resp.Body.Close() + return nil + } + time.Sleep(waitTick) + } + return fmt.Errorf("node %d not ready within %v", i, timeout) +} + +// DumpLogs prints the tail of all master logs. +func (mc *MasterCluster) DumpLogs() { + for i := range 3 { + mc.t.Logf("=== master%d log tail ===\n%s", i, mc.tailLog(i)) + } +} + +func (mc *MasterCluster) tailLog(i int) string { + f, err := os.Open(mc.nodes[i].logFile) + if err != nil { + return "(no log)" + } + defer f.Close() + scanner := bufio.NewScanner(f) + lines := make([]string, 0, 50) + for scanner.Scan() { + lines = append(lines, scanner.Text()) + if len(lines) > 50 { + lines = lines[1:] + } + } + return strings.Join(lines, "\n") +} + +// --- port and binary helpers (adapted from test/volume_server/framework) --- + +// allocateMultipleMasterPortPairs finds n non-overlapping (http, grpc) port +// pairs, holding all listeners until all are found, then releasing them +// together to avoid races between consecutive allocations. +func allocateMultipleMasterPortPairs(n int) ([][2]int, error) { + var listeners []net.Listener + var pairs [][2]int + + defer func() { + for _, l := range listeners { + l.Close() + } + }() + + for masterPort := 10000; masterPort <= 55535 && len(pairs) < n; masterPort++ { + grpcPort := masterPort + 10000 + l1, err := net.Listen("tcp", net.JoinHostPort("127.0.0.1", strconv.Itoa(masterPort))) + if err != nil { + continue + } + l2, err := net.Listen("tcp", net.JoinHostPort("127.0.0.1", strconv.Itoa(grpcPort))) + if err != nil { + l1.Close() + continue + } + listeners = append(listeners, l1, l2) + pairs = append(pairs, [2]int{masterPort, grpcPort}) + } + + if len(pairs) < n { + return nil, fmt.Errorf("could only allocate %d of %d master port pairs", len(pairs), n) + } + return pairs, nil +} + +func findOrBuildWeedBinary() (string, error) { + if fromEnv := os.Getenv("WEED_BINARY"); fromEnv != "" { + if isExecutableFile(fromEnv) { + return fromEnv, nil + } + return "", fmt.Errorf("WEED_BINARY not executable: %s", fromEnv) + } + + repoRoot := "" + if _, file, _, ok := runtime.Caller(0); ok { + repoRoot = filepath.Clean(filepath.Join(filepath.Dir(file), "..", "..")) + } + if repoRoot == "" { + return "", fmt.Errorf("unable to detect repository root") + } + + // Check if already built + binDir := filepath.Join(os.TempDir(), "seaweedfs_multi_master_it_bin") + os.MkdirAll(binDir, 0o755) + binPath := filepath.Join(binDir, "weed") + if isExecutableFile(binPath) { + return binPath, nil + } + + cmd := exec.Command("go", "build", "-o", binPath, ".") + cmd.Dir = filepath.Join(repoRoot, "weed") + var out bytes.Buffer + cmd.Stdout = &out + cmd.Stderr = &out + if err := cmd.Run(); err != nil { + return "", fmt.Errorf("build weed binary: %w\n%s", err, out.String()) + } + return binPath, nil +} + +func isExecutableFile(path string) bool { + info, err := os.Stat(path) + if err != nil || info.IsDir() { + return false + } + return info.Mode().Perm()&0o111 != 0 +} diff --git a/test/multi_master/failover_test.go b/test/multi_master/failover_test.go new file mode 100644 index 000000000..45ba204d4 --- /dev/null +++ b/test/multi_master/failover_test.go @@ -0,0 +1,313 @@ +package multi_master + +import ( + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb" +) + +const ( + // Election timeout is 3s in our cluster config; allow generous margin. + leaderElectionTimeout = 20 * time.Second +) + +// TestLeaderDownAndRecoverQuickly verifies that when the leader is stopped and +// restarted quickly, the cluster re-elects a leader and the restarted node +// rejoins as a follower. TopologyId must be consistent across all nodes. +func TestLeaderDownAndRecoverQuickly(t *testing.T) { + mc := StartMasterCluster(t) + + // Record initial state. + leaderIdx, leaderAddr := mc.FindLeader() + if leaderIdx < 0 { + t.Fatal("no leader found after cluster start") + } + t.Logf("initial leader: node %d at %s", leaderIdx, leaderAddr) + + topologyId, err := mc.GetTopologyId(leaderIdx) + if err != nil || topologyId == "" { + t.Fatalf("failed to get initial TopologyId: %v", err) + } + t.Logf("initial TopologyId: %s", topologyId) + + // Stop the leader. + mc.StopNode(leaderIdx) + t.Logf("stopped leader node %d", leaderIdx) + + // Wait for a new leader from the remaining 2 nodes. + newLeaderIdx, newLeaderAddr, err := mc.WaitForNewLeader(leaderAddr, leaderElectionTimeout) + if err != nil { + mc.DumpLogs() + t.Fatalf("new leader not elected after stopping old leader: %v", err) + } + t.Logf("new leader: node %d at %s", newLeaderIdx, newLeaderAddr) + + // Restart the old leader quickly. + mc.StartNode(leaderIdx) + if err := mc.WaitForNodeReady(leaderIdx, waitTimeout); err != nil { + mc.DumpLogs() + t.Fatalf("restarted node %d not ready: %v", leaderIdx, err) + } + t.Logf("restarted node %d", leaderIdx) + + // Give raft time to settle. + time.Sleep(3 * time.Second) + + // Verify leader is stable. + finalLeaderIdx, _ := mc.FindLeader() + if finalLeaderIdx < 0 { + mc.DumpLogs() + t.Fatal("no leader after restarting old leader node") + } + + // Verify TopologyId is consistent across all nodes. + assertTopologyIdConsistent(t, mc, topologyId) +} + +// TestLeaderDownSlowRecover verifies that when the leader goes down and takes +// a long time to come back, the remaining 2 nodes elect a new leader and the +// cluster continues to function. When the slow node returns, it rejoins. +func TestLeaderDownSlowRecover(t *testing.T) { + mc := StartMasterCluster(t) + + leaderIdx, leaderAddr := mc.FindLeader() + if leaderIdx < 0 { + t.Fatal("no leader found") + } + topologyId, err := mc.GetTopologyId(leaderIdx) + if err != nil || topologyId == "" { + t.Fatalf("failed to get initial TopologyId: %v", err) + } + t.Logf("initial leader: node %d, TopologyId: %s", leaderIdx, topologyId) + + // Stop the leader. + mc.StopNode(leaderIdx) + + // Wait for a new leader. + newLeaderIdx, _, err := mc.WaitForNewLeader(leaderAddr, leaderElectionTimeout) + if err != nil { + mc.DumpLogs() + t.Fatalf("new leader not elected: %v", err) + } + t.Logf("new leader: node %d", newLeaderIdx) + + // Verify cluster functions with only 2 nodes (quorum is 2/3). + cs, err := mc.GetClusterStatus(newLeaderIdx) + if err != nil { + mc.DumpLogs() + t.Fatalf("cannot get cluster status from new leader: %v", err) + } + if !cs.IsLeader { + t.Fatalf("node %d claims not to be leader", newLeaderIdx) + } + + // Simulate slow recovery: wait significantly longer than election timeout. + t.Log("simulating slow recovery (10 seconds)...") + time.Sleep(10 * time.Second) + + // Verify leader is still stable during the outage. + stableLeaderIdx, _ := mc.FindLeader() + if stableLeaderIdx < 0 { + mc.DumpLogs() + t.Fatal("leader lost during extended outage of one node") + } + + // Restart the downed node. + mc.StartNode(leaderIdx) + if err := mc.WaitForNodeReady(leaderIdx, waitTimeout); err != nil { + mc.DumpLogs() + t.Fatalf("slow-recovered node %d not ready: %v", leaderIdx, err) + } + + time.Sleep(3 * time.Second) + assertTopologyIdConsistent(t, mc, topologyId) +} + +// TestTwoMastersDownAndRestart verifies that when 2 of 3 masters go down +// (losing quorum), the cluster cannot elect a leader. When both restart, +// a leader is elected and TopologyId is preserved. +func TestTwoMastersDownAndRestart(t *testing.T) { + mc := StartMasterCluster(t) + + leaderIdx, _ := mc.FindLeader() + if leaderIdx < 0 { + t.Fatal("no leader found") + } + topologyId, err := mc.GetTopologyId(leaderIdx) + if err != nil || topologyId == "" { + t.Fatalf("failed to get initial TopologyId: %v", err) + } + t.Logf("initial TopologyId: %s", topologyId) + + // Determine which 2 nodes to stop (stop the leader + one follower). + down1 := leaderIdx + down2 := (leaderIdx + 1) % 3 + survivor := (leaderIdx + 2) % 3 + t.Logf("stopping nodes %d and %d, keeping node %d", down1, down2, survivor) + + mc.StopNode(down1) + mc.StopNode(down2) + + // The surviving node alone cannot form a quorum — no leader expected. + // Wait long enough for any stale leadership to expire (election timeout + // is 3s in our config, quorum check fires every election timeout). + time.Sleep(5 * time.Second) + soloLeaderIdx, _ := mc.FindLeader() + if soloLeaderIdx >= 0 { + // It's possible the survivor briefly thinks it's leader before stepping down. + // Give it time to realize it lost quorum. + time.Sleep(5 * time.Second) + soloLeaderIdx, _ = mc.FindLeader() + } + if soloLeaderIdx >= 0 { + mc.DumpLogs() + t.Fatalf("expected no leader with only 1 of 3 nodes, but node %d claims leadership", soloLeaderIdx) + } + + // Restart both downed nodes. + mc.StartNode(down1) + mc.StartNode(down2) + for _, i := range []int{down1, down2} { + if err := mc.WaitForNodeReady(i, waitTimeout); err != nil { + mc.DumpLogs() + t.Fatalf("restarted node %d not ready: %v", i, err) + } + } + + // Wait for leader election. + if err := mc.WaitForLeader(leaderElectionTimeout); err != nil { + mc.DumpLogs() + t.Fatalf("no leader after restarting 2 downed nodes: %v", err) + } + + time.Sleep(3 * time.Second) + assertTopologyIdConsistent(t, mc, topologyId) +} + +// TestAllMastersDownAndRestart verifies that when all 3 masters are stopped +// and restarted, the cluster elects a leader and all nodes agree on a +// TopologyId. With RaftResumeState=false (default), raft state is cleared on +// restart. The TopologyId is recovered from snapshots when available; on a +// short-lived cluster that hasn't taken snapshots on all nodes, a new +// TopologyId may be generated — but all nodes must still agree. +func TestAllMastersDownAndRestart(t *testing.T) { + mc := StartMasterCluster(t) + + leaderIdx, _ := mc.FindLeader() + if leaderIdx < 0 { + t.Fatal("no leader found") + } + topologyId, _ := mc.GetTopologyId(leaderIdx) + if topologyId == "" { + t.Fatal("no TopologyId on initial leader") + } + t.Logf("initial TopologyId: %s", topologyId) + + // Stop all nodes. + for i := range 3 { + mc.StopNode(i) + } + t.Log("all nodes stopped") + + time.Sleep(2 * time.Second) + + // Restart all nodes. + for i := range 3 { + mc.StartNode(i) + } + for i := range 3 { + if err := mc.WaitForNodeReady(i, waitTimeout); err != nil { + mc.DumpLogs() + t.Fatalf("node %d not ready after full restart: %v", i, err) + } + } + + // Wait for leader. + if err := mc.WaitForLeader(leaderElectionTimeout); err != nil { + mc.DumpLogs() + t.Fatalf("no leader after full cluster restart: %v", err) + } + + newLeaderIdx, _ := mc.FindLeader() + t.Logf("leader after full restart: node %d", newLeaderIdx) + + time.Sleep(3 * time.Second) + + // All nodes must agree on a TopologyId (may differ from original if + // snapshots were not yet taken on all nodes before shutdown). + newTopologyId, err := mc.GetTopologyId(newLeaderIdx) + if err != nil || newTopologyId == "" { + mc.DumpLogs() + t.Fatal("no TopologyId after full restart") + } + if newTopologyId == topologyId { + t.Logf("TopologyId preserved across full restart: %s", topologyId) + } else { + t.Logf("TopologyId changed (expected for short-lived cluster without snapshots): %s -> %s", topologyId, newTopologyId) + } + assertTopologyIdConsistent(t, mc, newTopologyId) +} + +// TestLeaderConsistencyAcrossNodes verifies that all nodes agree on who the +// leader is and report the same TopologyId. +func TestLeaderConsistencyAcrossNodes(t *testing.T) { + mc := StartMasterCluster(t) + + // Allow cluster to stabilize. + time.Sleep(3 * time.Second) + + leaderIdx, leaderAddr := mc.FindLeader() + if leaderIdx < 0 { + t.Fatal("no leader found") + } + t.Logf("leader: node %d at %s", leaderIdx, leaderAddr) + + // Every node should agree on the leader. + for i := range 3 { + cs, err := mc.GetClusterStatus(i) + if err != nil { + t.Fatalf("node %d cluster/status error: %v", i, err) + } + if i == leaderIdx { + if !cs.IsLeader { + t.Errorf("node %d should be leader but IsLeader=false", i) + } + } else { + if cs.IsLeader { + t.Errorf("node %d should not be leader but IsLeader=true", i) + } + // cs.Leader is a ServerAddress like "127.0.0.1:10000.20000"; + // convert to HTTP address for comparison with leaderAddr. + leaderHttp := pb.ServerAddress(cs.Leader).ToHttpAddress() + if leaderHttp != leaderAddr { + t.Errorf("node %d reports leader %q (http: %s), expected %q", i, cs.Leader, leaderHttp, leaderAddr) + } + } + } + + // All nodes should have the same TopologyId. + topologyId, _ := mc.GetTopologyId(leaderIdx) + if topologyId == "" { + t.Fatal("leader has no TopologyId") + } + assertTopologyIdConsistent(t, mc, topologyId) +} + +// assertTopologyIdConsistent verifies that all running nodes report the expected TopologyId. +func assertTopologyIdConsistent(t *testing.T, mc *MasterCluster, expectedId string) { + t.Helper() + for i := range 3 { + if !mc.IsNodeRunning(i) { + continue + } + id, err := mc.GetTopologyId(i) + if err != nil { + t.Errorf("node %d: failed to get TopologyId: %v", i, err) + continue + } + if id != expectedId { + t.Errorf("node %d: TopologyId=%q, expected %q", i, id, expectedId) + } + } +} diff --git a/weed/command/master.go b/weed/command/master.go index 565c9cc58..dd880acd4 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "fmt" + "math/rand/v2" "net" "net/http" "os" @@ -257,12 +258,27 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { // For multi-master mode with non-Hashicorp raft, wait and check if we should join if !*masterOption.raftHashicorp && !isSingleMaster { go func() { - time.Sleep(raftJoinCheckDelay) + // Stagger bootstrap by peer index so masters don't all check + // simultaneously. Peer 0 waits ~1.5s, peer 1 ~3s, etc. + idx := peerIndex(myMasterAddress, peers) + delay := time.Duration(float64(raftJoinCheckDelay) * (rand.Float64()*0.25 + 1) * float64(idx+1)) + glog.V(0).Infof("bootstrap check in %v (peer index %d of %d)", delay, idx, len(peers)) + time.Sleep(delay) ms.Topo.RaftServerAccessLock.RLock() isEmptyMaster := ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() - if isEmptyMaster && isTheFirstOne(myMasterAddress, peers) && ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" { - raftServer.DoJoinCommand() + isFirst := idx == 0 + if isEmptyMaster && isFirst { + existingLeader := ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) + if existingLeader == "" { + raftServer.DoJoinCommand() + } else { + glog.V(0).Infof("skip bootstrap: existing leader %s found from peers", existingLeader) + } + } else if !isEmptyMaster { + glog.V(0).Infof("skip bootstrap: leader=%q logEmpty=%v", ms.Topo.RaftServer.Leader(), ms.Topo.RaftServer.IsLogEmpty()) + } else { + glog.V(0).Infof("skip bootstrap: %v is not the first master in peers (index %d)", myMasterAddress, idx) } ms.Topo.RaftServerAccessLock.RUnlock() }() @@ -385,14 +401,19 @@ func normalizeMasterPeerAddress(peer pb.ServerAddress, self pb.ServerAddress) pb return pb.NewServerAddressWithGrpcPort(peer.ToHttpAddress(), grpcPortValue) } -func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool { +// peerIndex returns the 0-based position of self in the sorted peer list. +// Peer 0 is the designated bootstrap node. Returns -1 if self is not found. +func peerIndex(self pb.ServerAddress, peers []pb.ServerAddress) int { slices.SortFunc(peers, func(a, b pb.ServerAddress) int { return strings.Compare(a.ToHttpAddress(), b.ToHttpAddress()) }) - if len(peers) <= 0 { - return true + for i, peer := range peers { + if peer.ToHttpAddress() == self.ToHttpAddress() { + return i + } } - return self.ToHttpAddress() == peers[0].ToHttpAddress() + glog.Warningf("peerIndex: self %s not found in peers %v", self, peers) + return -1 } func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption { diff --git a/weed/command/master_test.go b/weed/command/master_test.go index b92254606..e35a9a70b 100644 --- a/weed/command/master_test.go +++ b/weed/command/master_test.go @@ -6,7 +6,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb" ) -func TestIsTheFirstOneIgnoresGrpcPort(t *testing.T) { +func TestPeerIndexIgnoresGrpcPort(t *testing.T) { self := pb.ServerAddress("127.0.0.1:9000.19000") peers := []pb.ServerAddress{ "127.0.0.1:9000", @@ -14,8 +14,8 @@ func TestIsTheFirstOneIgnoresGrpcPort(t *testing.T) { "127.0.0.1:9003.19003", } - if !isTheFirstOne(self, peers) { - t.Fatalf("expected first peer match by HTTP address between %q and %+v", self, peers) + if idx := peerIndex(self, peers); idx != 0 { + t.Fatalf("expected peer index 0 for %q among %+v, got %d", self, peers, idx) } } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index b65869107..f403ff771 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -297,11 +297,28 @@ func (ms *MasterServer) ensureTopologyId() { currentId := ms.Topo.GetTopologyId() glog.V(1).Infof("ensureTopologyId: current TopologyId after barrier: %s", currentId) + prevId := ms.Topo.GetTopologyId() + EnsureTopologyId(ms.Topo, func() bool { return ms.Topo.IsLeader() }, func(topologyId string) error { return ms.syncRaftForTopologyId(topologyId) }) + + // If a new TopologyId was generated, take a snapshot so it survives + // raft state cleanup on future non-resume restarts. + if prevId == "" && ms.Topo.GetTopologyId() != "" { + ms.Topo.RaftServerAccessLock.RLock() + if ms.Topo.RaftServer != nil { + if err := ms.Topo.RaftServer.TakeSnapshot(); err != nil { + glog.Warningf("snapshot after TopologyId generation: %v", err) + } else { + glog.V(0).Infof("snapshot taken to persist TopologyId %s", ms.Topo.GetTopologyId()) + } + } + // Hashicorp raft snapshots are handled automatically. + ms.Topo.RaftServerAccessLock.RUnlock() + } } func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc { diff --git a/weed/server/raft_common.go b/weed/server/raft_common.go index 09aeaaac0..9a87a9952 100644 --- a/weed/server/raft_common.go +++ b/weed/server/raft_common.go @@ -1,6 +1,7 @@ package weed_server import ( + "encoding/json" "time" "github.com/google/uuid" @@ -8,6 +9,23 @@ import ( "github.com/seaweedfs/seaweedfs/weed/topology" ) +// recoverTopologyIdFromState restores the TopologyId from serialized FSM +// state bytes (JSON-encoded MaxVolumeIdCommand). Both raft implementations +// call this after reading their snapshot in their own format. +func recoverTopologyIdFromState(fsmState []byte, topo *topology.Topology) { + if topo.GetTopologyId() != "" { + return + } + var cmd topology.MaxVolumeIdCommand + if err := json.Unmarshal(fsmState, &cmd); err != nil { + return + } + if cmd.TopologyId != "" { + topo.SetTopologyId(cmd.TopologyId) + glog.V(0).Infof("Recovered TopologyId from snapshot: %s", cmd.TopologyId) + } +} + // EnsureTopologyId ensures that a TopologyId is generated and persisted if it's currently missing. // It uses the provided checkLeaderFn to verify leadership and persistFn to save the new ID. func EnsureTopologyId(topo *topology.Topology, checkLeaderFn func() bool, persistFn func(string) error) { diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go index 99d31b1d1..575478aec 100644 --- a/weed/server/raft_hashicorp.go +++ b/weed/server/raft_hashicorp.go @@ -6,6 +6,7 @@ package weed_server import ( "encoding/json" "fmt" + "io" "math/rand/v2" "os" "path" @@ -56,6 +57,28 @@ func raftServerID(server pb.ServerAddress) string { return server.ToHttpAddress() } +// recoverTopologyIdFromHashicorpSnapshot reads the TopologyId from the latest +// hashicorp raft snapshot before state cleanup. +func recoverTopologyIdFromHashicorpSnapshot(dataDir string, topo *topology.Topology) { + fss, err := raft.NewFileSnapshotStore(dataDir, 1, io.Discard) + if err != nil { + return + } + snapshots, err := fss.List() + if err != nil || len(snapshots) == 0 { + return + } + _, rc, err := fss.Open(snapshots[0].ID) + if err != nil { + return + } + defer rc.Close() + + if b, err := io.ReadAll(rc); err == nil { + recoverTopologyIdFromState(b, topo) + } +} + func (s *RaftServer) AddPeersConfiguration() (cfg raft.Configuration) { for _, peer := range s.peers { cfg.Servers = append(cfg.Servers, raft.Server{ @@ -168,6 +191,8 @@ func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) { } if option.RaftBootstrap { + recoverTopologyIdFromHashicorpSnapshot(s.dataDir, option.Topo) + os.RemoveAll(path.Join(s.dataDir, ldbFile)) os.RemoveAll(path.Join(s.dataDir, sdbFile)) os.RemoveAll(path.Join(s.dataDir, "snapshots")) diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index ce35f6562..938219325 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -2,10 +2,13 @@ package weed_server import ( "encoding/json" + "fmt" + "hash/crc32" "io" "math/rand/v2" "os" "path" + "sort" "sync" "time" @@ -142,11 +145,20 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { glog.V(0).Infof("Starting RaftServer with %v", option.ServerAddr) if !option.RaftResumeState { + // Recover the TopologyId from the snapshot before clearing state. + // The TopologyId is a cluster identity used for license validation + // and must survive raft state cleanup. + recoverTopologyIdFromSnapshot(s.dataDir, option.Topo) + // clear previous log to ensure fresh start os.RemoveAll(path.Join(s.dataDir, "log")) // always clear previous metadata os.RemoveAll(path.Join(s.dataDir, "conf")) os.RemoveAll(path.Join(s.dataDir, "snapshot")) + // clear persisted vote state (currentTerm/votedFor) so that stale + // terms from previous runs cannot cause election conflicts when the + // log has been wiped. + os.Remove(path.Join(s.dataDir, "state")) } if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), os.ModePerm); err != nil { return nil, err @@ -208,6 +220,47 @@ func (s *RaftServer) Peers() (members []string) { return } +// recoverTopologyIdFromSnapshot reads the TopologyId from the latest +// seaweedfs/raft snapshot before state cleanup. +func recoverTopologyIdFromSnapshot(dataDir string, topo *topology.Topology) { + snapshotDir := path.Join(dataDir, "snapshot") + dir, err := os.Open(snapshotDir) + if err != nil { + return + } + defer dir.Close() + filenames, err := dir.Readdirnames(-1) + if err != nil || len(filenames) == 0 { + return + } + + sort.Strings(filenames) + file, err := os.Open(path.Join(snapshotDir, filenames[len(filenames)-1])) + if err != nil { + return + } + defer file.Close() + + // Snapshot format: 8-hex-digit CRC32 checksum, newline, JSON body. + var checksum uint32 + if _, err := fmt.Fscanf(file, "%08x\n", &checksum); err != nil { + return + } + b, err := io.ReadAll(file) + if err != nil || crc32.ChecksumIEEE(b) != checksum { + return + } + + // The snapshot JSON wraps the FSM state in a "state" field. + var snap struct { + State json.RawMessage `json:"state"` + } + if err := json.Unmarshal(b, &snap); err != nil || len(snap.State) == 0 { + return + } + recoverTopologyIdFromState(snap.State, topo) +} + func (s *RaftServer) DoJoinCommand() { glog.V(0).Infoln("Initializing new cluster") diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index f3ccaf2e2..9f705bbcc 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -183,16 +183,29 @@ func (mc *MasterClient) SetOnPeerUpdateFn(onPeerUpdate func(update *master_pb.Cl func (mc *MasterClient) tryAllMasters(ctx context.Context) { var nextHintedLeader pb.ServerAddress + failedMasters := make(map[pb.ServerAddress]struct{}) mc.masters.RefreshBySrvIfAvailable() for _, master := range mc.masters.GetInstances() { + if _, failed := failedMasters[master]; failed { + continue + } nextHintedLeader = mc.tryConnectToMaster(ctx, master) for nextHintedLeader != "" { + if _, failed := failedMasters[nextHintedLeader]; failed { + break // don't follow redirect to a known-unreachable master + } select { case <-ctx.Done(): glog.V(0).Infof("Connection attempt to all masters stopped: %v", ctx.Err()) return default: - nextHintedLeader = mc.tryConnectToMaster(ctx, nextHintedLeader) + target := nextHintedLeader + nextHintedLeader = mc.tryConnectToMaster(ctx, target) + if nextHintedLeader == "" { + // connection to target failed; remember it so we skip + // stale redirects pointing back to it this cycle + failedMasters[target] = struct{}{} + } } } mc.setCurrentMaster("")