diff --git a/test/multi_master/cluster.go b/test/multi_master/cluster.go new file mode 100644 index 000000000..ac84ddb9c --- /dev/null +++ b/test/multi_master/cluster.go @@ -0,0 +1,437 @@ +package multi_master + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "os" + "os/exec" + "path/filepath" + "runtime" + "strconv" + "strings" + "sync" + "testing" + "time" +) + +const ( + waitTimeout = 30 * time.Second + waitTick = 200 * time.Millisecond +) + +// masterNode represents a single master process in the cluster. +type masterNode struct { + port int + grpcPort int + dataDir string + cmd *exec.Cmd + logFile string + stopped bool +} + +// MasterCluster manages a 3-node master raft cluster for integration tests. +type MasterCluster struct { + t testing.TB + weedBinary string + baseDir string + logsDir string + keepLogs bool + + nodes [3]*masterNode + mu sync.Mutex + + // peers string shared by all nodes, e.g. "127.0.0.1:9333,127.0.0.1:9334,127.0.0.1:9335" + peersStr string +} + +// clusterStatus is the JSON returned by /cluster/status. +type clusterStatus struct { + IsLeader bool `json:"IsLeader"` + Leader string `json:"Leader"` + Peers []string `json:"Peers"` +} + +// StartMasterCluster boots a 3-node master raft cluster and waits for a leader. +func StartMasterCluster(t testing.TB) *MasterCluster { + t.Helper() + + weedBinary, err := findOrBuildWeedBinary() + if err != nil { + t.Fatalf("resolve weed binary: %v", err) + } + + keepLogs := os.Getenv("MULTI_MASTER_IT_KEEP_LOGS") == "1" + baseDir, err := os.MkdirTemp("", "seaweedfs_multi_master_it_") + if err != nil { + t.Fatalf("create temp dir: %v", err) + } + logsDir := filepath.Join(baseDir, "logs") + os.MkdirAll(logsDir, 0o755) + + // Allocate 3 port pairs (http, grpc) atomically to prevent reuse. + portPairs, err := allocateMultipleMasterPortPairs(3) + if err != nil { + t.Fatalf("allocate ports: %v", err) + } + var nodes [3]*masterNode + var peerParts []string + for i, pp := range portPairs { + dataDir := filepath.Join(baseDir, fmt.Sprintf("m%d", i)) + os.MkdirAll(dataDir, 0o755) + nodes[i] = &masterNode{ + port: pp[0], + grpcPort: pp[1], + dataDir: dataDir, + logFile: filepath.Join(logsDir, fmt.Sprintf("master%d.log", i)), + } + peerParts = append(peerParts, fmt.Sprintf("127.0.0.1:%d", pp[0])) + } + + mc := &MasterCluster{ + t: t, + weedBinary: weedBinary, + baseDir: baseDir, + logsDir: logsDir, + keepLogs: keepLogs, + nodes: nodes, + peersStr: strings.Join(peerParts, ","), + } + + for i := range 3 { + mc.StartNode(i) + } + + if err := mc.WaitForLeader(waitTimeout); err != nil { + mc.DumpLogs() + mc.StopAll() + t.Fatalf("cluster did not elect a leader: %v", err) + } + + // Wait for TopologyId to be generated and propagated. This is async + // after leader election, and we need it committed before tests can + // reliably stop/restart nodes. + if err := mc.WaitForTopologyId(waitTimeout); err != nil { + mc.DumpLogs() + mc.StopAll() + t.Fatalf("TopologyId not generated: %v", err) + } + + t.Cleanup(func() { + mc.StopAll() + }) + return mc +} + +// StartNode starts the master process at the given index (0–2). +func (mc *MasterCluster) StartNode(i int) { + mc.t.Helper() + mc.mu.Lock() + defer mc.mu.Unlock() + + n := mc.nodes[i] + if n.cmd != nil && !n.stopped { + return // already running + } + + logFile, err := os.OpenFile(n.logFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + mc.t.Fatalf("create log for node %d: %v", i, err) + } + + args := []string{ + "master", + "-ip=127.0.0.1", + "-port=" + strconv.Itoa(n.port), + "-port.grpc=" + strconv.Itoa(n.grpcPort), + "-mdir=" + n.dataDir, + "-peers=" + mc.peersStr, + "-electionTimeout=3s", + "-volumeSizeLimitMB=32", + "-defaultReplication=000", + } + + n.cmd = exec.Command(mc.weedBinary, args...) + n.cmd.Dir = mc.baseDir + n.cmd.Stdout = logFile + n.cmd.Stderr = logFile + n.stopped = false + if err := n.cmd.Start(); err != nil { + mc.t.Fatalf("start node %d: %v", i, err) + } +} + +// StopNode gracefully stops the master at the given index. +func (mc *MasterCluster) StopNode(i int) { + mc.mu.Lock() + defer mc.mu.Unlock() + mc.stopNodeLocked(i) +} + +func (mc *MasterCluster) stopNodeLocked(i int) { + n := mc.nodes[i] + if n.cmd == nil || n.stopped { + return + } + n.stopped = true + _ = n.cmd.Process.Signal(os.Interrupt) + done := make(chan error, 1) + go func() { done <- n.cmd.Wait() }() + select { + case <-time.After(10 * time.Second): + _ = n.cmd.Process.Kill() + <-done + case <-done: + } +} + +// StopAll stops all running master nodes. +func (mc *MasterCluster) StopAll() { + mc.mu.Lock() + defer mc.mu.Unlock() + for i := range 3 { + mc.stopNodeLocked(i) + } + if !mc.keepLogs && !mc.t.Failed() { + os.RemoveAll(mc.baseDir) + } else if mc.baseDir != "" { + mc.t.Logf("multi-master logs kept at %s", mc.baseDir) + } +} + +// NodeURL returns the HTTP URL for node i. +func (mc *MasterCluster) NodeURL(i int) string { + return fmt.Sprintf("http://127.0.0.1:%d", mc.nodes[i].port) +} + +// NodeAddress returns "127.0.0.1:port" for node i. +func (mc *MasterCluster) NodeAddress(i int) string { + return fmt.Sprintf("127.0.0.1:%d", mc.nodes[i].port) +} + +// NodeGRPCAddress returns "127.0.0.1:grpcPort" for node i. +func (mc *MasterCluster) NodeGRPCAddress(i int) string { + return fmt.Sprintf("127.0.0.1:%d", mc.nodes[i].grpcPort) +} + +// IsNodeRunning returns true if the node at index i has a live process. +func (mc *MasterCluster) IsNodeRunning(i int) bool { + mc.mu.Lock() + defer mc.mu.Unlock() + n := mc.nodes[i] + return n.cmd != nil && !n.stopped +} + +// GetClusterStatus fetches /cluster/status from node i. +func (mc *MasterCluster) GetClusterStatus(i int) (*clusterStatus, error) { + client := &http.Client{Timeout: 2 * time.Second} + resp, err := client.Get(mc.NodeURL(i) + "/cluster/status") + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + var cs clusterStatus + if err := json.Unmarshal(body, &cs); err != nil { + return nil, fmt.Errorf("parse cluster/status: %w (body: %s)", err, string(body)) + } + return &cs, nil +} + +// GetTopologyId fetches the TopologyId from /dir/status on node i. +func (mc *MasterCluster) GetTopologyId(i int) (string, error) { + client := &http.Client{Timeout: 2 * time.Second} + resp, err := client.Get(mc.NodeURL(i) + "/dir/status") + if err != nil { + return "", err + } + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + var raw map[string]any + if err := json.Unmarshal(body, &raw); err != nil { + return "", err + } + if id, ok := raw["TopologyId"].(string); ok { + return id, nil + } + return "", nil +} + +// FindLeader returns the index of the leader node and its address. +// Returns -1 if no leader is found. +func (mc *MasterCluster) FindLeader() (int, string) { + for i := range 3 { + if !mc.IsNodeRunning(i) { + continue + } + cs, err := mc.GetClusterStatus(i) + if err != nil { + continue + } + if cs.IsLeader { + return i, mc.NodeAddress(i) + } + } + return -1, "" +} + +// WaitForLeader polls until a leader is elected or timeout. +func (mc *MasterCluster) WaitForLeader(timeout time.Duration) error { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if idx, _ := mc.FindLeader(); idx >= 0 { + return nil + } + time.Sleep(waitTick) + } + return fmt.Errorf("no leader elected within %v", timeout) +} + +// WaitForNewLeader waits for a leader that is different from the given address. +func (mc *MasterCluster) WaitForNewLeader(oldLeaderAddr string, timeout time.Duration) (int, string, error) { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + idx, addr := mc.FindLeader() + if idx >= 0 && addr != oldLeaderAddr { + return idx, addr, nil + } + time.Sleep(waitTick) + } + return -1, "", fmt.Errorf("no new leader (different from %s) within %v", oldLeaderAddr, timeout) +} + +// WaitForTopologyId waits until the leader reports a non-empty TopologyId. +func (mc *MasterCluster) WaitForTopologyId(timeout time.Duration) error { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if idx, _ := mc.FindLeader(); idx >= 0 { + if id, err := mc.GetTopologyId(idx); err == nil && id != "" { + return nil + } + } + time.Sleep(waitTick) + } + return fmt.Errorf("TopologyId not available within %v", timeout) +} + +// WaitForNodeReady waits for node i to respond to HTTP. +func (mc *MasterCluster) WaitForNodeReady(i int, timeout time.Duration) error { + client := &http.Client{Timeout: 1 * time.Second} + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + resp, err := client.Get(mc.NodeURL(i) + "/cluster/status") + if err == nil { + resp.Body.Close() + return nil + } + time.Sleep(waitTick) + } + return fmt.Errorf("node %d not ready within %v", i, timeout) +} + +// DumpLogs prints the tail of all master logs. +func (mc *MasterCluster) DumpLogs() { + for i := range 3 { + mc.t.Logf("=== master%d log tail ===\n%s", i, mc.tailLog(i)) + } +} + +func (mc *MasterCluster) tailLog(i int) string { + f, err := os.Open(mc.nodes[i].logFile) + if err != nil { + return "(no log)" + } + defer f.Close() + scanner := bufio.NewScanner(f) + lines := make([]string, 0, 50) + for scanner.Scan() { + lines = append(lines, scanner.Text()) + if len(lines) > 50 { + lines = lines[1:] + } + } + return strings.Join(lines, "\n") +} + +// --- port and binary helpers (adapted from test/volume_server/framework) --- + +// allocateMultipleMasterPortPairs finds n non-overlapping (http, grpc) port +// pairs, holding all listeners until all are found, then releasing them +// together to avoid races between consecutive allocations. +func allocateMultipleMasterPortPairs(n int) ([][2]int, error) { + var listeners []net.Listener + var pairs [][2]int + + defer func() { + for _, l := range listeners { + l.Close() + } + }() + + for masterPort := 10000; masterPort <= 55535 && len(pairs) < n; masterPort++ { + grpcPort := masterPort + 10000 + l1, err := net.Listen("tcp", net.JoinHostPort("127.0.0.1", strconv.Itoa(masterPort))) + if err != nil { + continue + } + l2, err := net.Listen("tcp", net.JoinHostPort("127.0.0.1", strconv.Itoa(grpcPort))) + if err != nil { + l1.Close() + continue + } + listeners = append(listeners, l1, l2) + pairs = append(pairs, [2]int{masterPort, grpcPort}) + } + + if len(pairs) < n { + return nil, fmt.Errorf("could only allocate %d of %d master port pairs", len(pairs), n) + } + return pairs, nil +} + +func findOrBuildWeedBinary() (string, error) { + if fromEnv := os.Getenv("WEED_BINARY"); fromEnv != "" { + if isExecutableFile(fromEnv) { + return fromEnv, nil + } + return "", fmt.Errorf("WEED_BINARY not executable: %s", fromEnv) + } + + repoRoot := "" + if _, file, _, ok := runtime.Caller(0); ok { + repoRoot = filepath.Clean(filepath.Join(filepath.Dir(file), "..", "..")) + } + if repoRoot == "" { + return "", fmt.Errorf("unable to detect repository root") + } + + // Check if already built + binDir := filepath.Join(os.TempDir(), "seaweedfs_multi_master_it_bin") + os.MkdirAll(binDir, 0o755) + binPath := filepath.Join(binDir, "weed") + if isExecutableFile(binPath) { + return binPath, nil + } + + cmd := exec.Command("go", "build", "-o", binPath, ".") + cmd.Dir = filepath.Join(repoRoot, "weed") + var out bytes.Buffer + cmd.Stdout = &out + cmd.Stderr = &out + if err := cmd.Run(); err != nil { + return "", fmt.Errorf("build weed binary: %w\n%s", err, out.String()) + } + return binPath, nil +} + +func isExecutableFile(path string) bool { + info, err := os.Stat(path) + if err != nil || info.IsDir() { + return false + } + return info.Mode().Perm()&0o111 != 0 +} diff --git a/test/multi_master/failover_test.go b/test/multi_master/failover_test.go new file mode 100644 index 000000000..45ba204d4 --- /dev/null +++ b/test/multi_master/failover_test.go @@ -0,0 +1,313 @@ +package multi_master + +import ( + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb" +) + +const ( + // Election timeout is 3s in our cluster config; allow generous margin. + leaderElectionTimeout = 20 * time.Second +) + +// TestLeaderDownAndRecoverQuickly verifies that when the leader is stopped and +// restarted quickly, the cluster re-elects a leader and the restarted node +// rejoins as a follower. TopologyId must be consistent across all nodes. +func TestLeaderDownAndRecoverQuickly(t *testing.T) { + mc := StartMasterCluster(t) + + // Record initial state. + leaderIdx, leaderAddr := mc.FindLeader() + if leaderIdx < 0 { + t.Fatal("no leader found after cluster start") + } + t.Logf("initial leader: node %d at %s", leaderIdx, leaderAddr) + + topologyId, err := mc.GetTopologyId(leaderIdx) + if err != nil || topologyId == "" { + t.Fatalf("failed to get initial TopologyId: %v", err) + } + t.Logf("initial TopologyId: %s", topologyId) + + // Stop the leader. + mc.StopNode(leaderIdx) + t.Logf("stopped leader node %d", leaderIdx) + + // Wait for a new leader from the remaining 2 nodes. + newLeaderIdx, newLeaderAddr, err := mc.WaitForNewLeader(leaderAddr, leaderElectionTimeout) + if err != nil { + mc.DumpLogs() + t.Fatalf("new leader not elected after stopping old leader: %v", err) + } + t.Logf("new leader: node %d at %s", newLeaderIdx, newLeaderAddr) + + // Restart the old leader quickly. + mc.StartNode(leaderIdx) + if err := mc.WaitForNodeReady(leaderIdx, waitTimeout); err != nil { + mc.DumpLogs() + t.Fatalf("restarted node %d not ready: %v", leaderIdx, err) + } + t.Logf("restarted node %d", leaderIdx) + + // Give raft time to settle. + time.Sleep(3 * time.Second) + + // Verify leader is stable. + finalLeaderIdx, _ := mc.FindLeader() + if finalLeaderIdx < 0 { + mc.DumpLogs() + t.Fatal("no leader after restarting old leader node") + } + + // Verify TopologyId is consistent across all nodes. + assertTopologyIdConsistent(t, mc, topologyId) +} + +// TestLeaderDownSlowRecover verifies that when the leader goes down and takes +// a long time to come back, the remaining 2 nodes elect a new leader and the +// cluster continues to function. When the slow node returns, it rejoins. +func TestLeaderDownSlowRecover(t *testing.T) { + mc := StartMasterCluster(t) + + leaderIdx, leaderAddr := mc.FindLeader() + if leaderIdx < 0 { + t.Fatal("no leader found") + } + topologyId, err := mc.GetTopologyId(leaderIdx) + if err != nil || topologyId == "" { + t.Fatalf("failed to get initial TopologyId: %v", err) + } + t.Logf("initial leader: node %d, TopologyId: %s", leaderIdx, topologyId) + + // Stop the leader. + mc.StopNode(leaderIdx) + + // Wait for a new leader. + newLeaderIdx, _, err := mc.WaitForNewLeader(leaderAddr, leaderElectionTimeout) + if err != nil { + mc.DumpLogs() + t.Fatalf("new leader not elected: %v", err) + } + t.Logf("new leader: node %d", newLeaderIdx) + + // Verify cluster functions with only 2 nodes (quorum is 2/3). + cs, err := mc.GetClusterStatus(newLeaderIdx) + if err != nil { + mc.DumpLogs() + t.Fatalf("cannot get cluster status from new leader: %v", err) + } + if !cs.IsLeader { + t.Fatalf("node %d claims not to be leader", newLeaderIdx) + } + + // Simulate slow recovery: wait significantly longer than election timeout. + t.Log("simulating slow recovery (10 seconds)...") + time.Sleep(10 * time.Second) + + // Verify leader is still stable during the outage. + stableLeaderIdx, _ := mc.FindLeader() + if stableLeaderIdx < 0 { + mc.DumpLogs() + t.Fatal("leader lost during extended outage of one node") + } + + // Restart the downed node. + mc.StartNode(leaderIdx) + if err := mc.WaitForNodeReady(leaderIdx, waitTimeout); err != nil { + mc.DumpLogs() + t.Fatalf("slow-recovered node %d not ready: %v", leaderIdx, err) + } + + time.Sleep(3 * time.Second) + assertTopologyIdConsistent(t, mc, topologyId) +} + +// TestTwoMastersDownAndRestart verifies that when 2 of 3 masters go down +// (losing quorum), the cluster cannot elect a leader. When both restart, +// a leader is elected and TopologyId is preserved. +func TestTwoMastersDownAndRestart(t *testing.T) { + mc := StartMasterCluster(t) + + leaderIdx, _ := mc.FindLeader() + if leaderIdx < 0 { + t.Fatal("no leader found") + } + topologyId, err := mc.GetTopologyId(leaderIdx) + if err != nil || topologyId == "" { + t.Fatalf("failed to get initial TopologyId: %v", err) + } + t.Logf("initial TopologyId: %s", topologyId) + + // Determine which 2 nodes to stop (stop the leader + one follower). + down1 := leaderIdx + down2 := (leaderIdx + 1) % 3 + survivor := (leaderIdx + 2) % 3 + t.Logf("stopping nodes %d and %d, keeping node %d", down1, down2, survivor) + + mc.StopNode(down1) + mc.StopNode(down2) + + // The surviving node alone cannot form a quorum — no leader expected. + // Wait long enough for any stale leadership to expire (election timeout + // is 3s in our config, quorum check fires every election timeout). + time.Sleep(5 * time.Second) + soloLeaderIdx, _ := mc.FindLeader() + if soloLeaderIdx >= 0 { + // It's possible the survivor briefly thinks it's leader before stepping down. + // Give it time to realize it lost quorum. + time.Sleep(5 * time.Second) + soloLeaderIdx, _ = mc.FindLeader() + } + if soloLeaderIdx >= 0 { + mc.DumpLogs() + t.Fatalf("expected no leader with only 1 of 3 nodes, but node %d claims leadership", soloLeaderIdx) + } + + // Restart both downed nodes. + mc.StartNode(down1) + mc.StartNode(down2) + for _, i := range []int{down1, down2} { + if err := mc.WaitForNodeReady(i, waitTimeout); err != nil { + mc.DumpLogs() + t.Fatalf("restarted node %d not ready: %v", i, err) + } + } + + // Wait for leader election. + if err := mc.WaitForLeader(leaderElectionTimeout); err != nil { + mc.DumpLogs() + t.Fatalf("no leader after restarting 2 downed nodes: %v", err) + } + + time.Sleep(3 * time.Second) + assertTopologyIdConsistent(t, mc, topologyId) +} + +// TestAllMastersDownAndRestart verifies that when all 3 masters are stopped +// and restarted, the cluster elects a leader and all nodes agree on a +// TopologyId. With RaftResumeState=false (default), raft state is cleared on +// restart. The TopologyId is recovered from snapshots when available; on a +// short-lived cluster that hasn't taken snapshots on all nodes, a new +// TopologyId may be generated — but all nodes must still agree. +func TestAllMastersDownAndRestart(t *testing.T) { + mc := StartMasterCluster(t) + + leaderIdx, _ := mc.FindLeader() + if leaderIdx < 0 { + t.Fatal("no leader found") + } + topologyId, _ := mc.GetTopologyId(leaderIdx) + if topologyId == "" { + t.Fatal("no TopologyId on initial leader") + } + t.Logf("initial TopologyId: %s", topologyId) + + // Stop all nodes. + for i := range 3 { + mc.StopNode(i) + } + t.Log("all nodes stopped") + + time.Sleep(2 * time.Second) + + // Restart all nodes. + for i := range 3 { + mc.StartNode(i) + } + for i := range 3 { + if err := mc.WaitForNodeReady(i, waitTimeout); err != nil { + mc.DumpLogs() + t.Fatalf("node %d not ready after full restart: %v", i, err) + } + } + + // Wait for leader. + if err := mc.WaitForLeader(leaderElectionTimeout); err != nil { + mc.DumpLogs() + t.Fatalf("no leader after full cluster restart: %v", err) + } + + newLeaderIdx, _ := mc.FindLeader() + t.Logf("leader after full restart: node %d", newLeaderIdx) + + time.Sleep(3 * time.Second) + + // All nodes must agree on a TopologyId (may differ from original if + // snapshots were not yet taken on all nodes before shutdown). + newTopologyId, err := mc.GetTopologyId(newLeaderIdx) + if err != nil || newTopologyId == "" { + mc.DumpLogs() + t.Fatal("no TopologyId after full restart") + } + if newTopologyId == topologyId { + t.Logf("TopologyId preserved across full restart: %s", topologyId) + } else { + t.Logf("TopologyId changed (expected for short-lived cluster without snapshots): %s -> %s", topologyId, newTopologyId) + } + assertTopologyIdConsistent(t, mc, newTopologyId) +} + +// TestLeaderConsistencyAcrossNodes verifies that all nodes agree on who the +// leader is and report the same TopologyId. +func TestLeaderConsistencyAcrossNodes(t *testing.T) { + mc := StartMasterCluster(t) + + // Allow cluster to stabilize. + time.Sleep(3 * time.Second) + + leaderIdx, leaderAddr := mc.FindLeader() + if leaderIdx < 0 { + t.Fatal("no leader found") + } + t.Logf("leader: node %d at %s", leaderIdx, leaderAddr) + + // Every node should agree on the leader. + for i := range 3 { + cs, err := mc.GetClusterStatus(i) + if err != nil { + t.Fatalf("node %d cluster/status error: %v", i, err) + } + if i == leaderIdx { + if !cs.IsLeader { + t.Errorf("node %d should be leader but IsLeader=false", i) + } + } else { + if cs.IsLeader { + t.Errorf("node %d should not be leader but IsLeader=true", i) + } + // cs.Leader is a ServerAddress like "127.0.0.1:10000.20000"; + // convert to HTTP address for comparison with leaderAddr. + leaderHttp := pb.ServerAddress(cs.Leader).ToHttpAddress() + if leaderHttp != leaderAddr { + t.Errorf("node %d reports leader %q (http: %s), expected %q", i, cs.Leader, leaderHttp, leaderAddr) + } + } + } + + // All nodes should have the same TopologyId. + topologyId, _ := mc.GetTopologyId(leaderIdx) + if topologyId == "" { + t.Fatal("leader has no TopologyId") + } + assertTopologyIdConsistent(t, mc, topologyId) +} + +// assertTopologyIdConsistent verifies that all running nodes report the expected TopologyId. +func assertTopologyIdConsistent(t *testing.T, mc *MasterCluster, expectedId string) { + t.Helper() + for i := range 3 { + if !mc.IsNodeRunning(i) { + continue + } + id, err := mc.GetTopologyId(i) + if err != nil { + t.Errorf("node %d: failed to get TopologyId: %v", i, err) + continue + } + if id != expectedId { + t.Errorf("node %d: TopologyId=%q, expected %q", i, id, expectedId) + } + } +} diff --git a/weed/command/master.go b/weed/command/master.go index 565c9cc58..dd880acd4 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "fmt" + "math/rand/v2" "net" "net/http" "os" @@ -257,12 +258,27 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { // For multi-master mode with non-Hashicorp raft, wait and check if we should join if !*masterOption.raftHashicorp && !isSingleMaster { go func() { - time.Sleep(raftJoinCheckDelay) + // Stagger bootstrap by peer index so masters don't all check + // simultaneously. Peer 0 waits ~1.5s, peer 1 ~3s, etc. + idx := peerIndex(myMasterAddress, peers) + delay := time.Duration(float64(raftJoinCheckDelay) * (rand.Float64()*0.25 + 1) * float64(idx+1)) + glog.V(0).Infof("bootstrap check in %v (peer index %d of %d)", delay, idx, len(peers)) + time.Sleep(delay) ms.Topo.RaftServerAccessLock.RLock() isEmptyMaster := ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() - if isEmptyMaster && isTheFirstOne(myMasterAddress, peers) && ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" { - raftServer.DoJoinCommand() + isFirst := idx == 0 + if isEmptyMaster && isFirst { + existingLeader := ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) + if existingLeader == "" { + raftServer.DoJoinCommand() + } else { + glog.V(0).Infof("skip bootstrap: existing leader %s found from peers", existingLeader) + } + } else if !isEmptyMaster { + glog.V(0).Infof("skip bootstrap: leader=%q logEmpty=%v", ms.Topo.RaftServer.Leader(), ms.Topo.RaftServer.IsLogEmpty()) + } else { + glog.V(0).Infof("skip bootstrap: %v is not the first master in peers (index %d)", myMasterAddress, idx) } ms.Topo.RaftServerAccessLock.RUnlock() }() @@ -385,14 +401,19 @@ func normalizeMasterPeerAddress(peer pb.ServerAddress, self pb.ServerAddress) pb return pb.NewServerAddressWithGrpcPort(peer.ToHttpAddress(), grpcPortValue) } -func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool { +// peerIndex returns the 0-based position of self in the sorted peer list. +// Peer 0 is the designated bootstrap node. Returns -1 if self is not found. +func peerIndex(self pb.ServerAddress, peers []pb.ServerAddress) int { slices.SortFunc(peers, func(a, b pb.ServerAddress) int { return strings.Compare(a.ToHttpAddress(), b.ToHttpAddress()) }) - if len(peers) <= 0 { - return true + for i, peer := range peers { + if peer.ToHttpAddress() == self.ToHttpAddress() { + return i + } } - return self.ToHttpAddress() == peers[0].ToHttpAddress() + glog.Warningf("peerIndex: self %s not found in peers %v", self, peers) + return -1 } func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption { diff --git a/weed/command/master_test.go b/weed/command/master_test.go index b92254606..e35a9a70b 100644 --- a/weed/command/master_test.go +++ b/weed/command/master_test.go @@ -6,7 +6,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb" ) -func TestIsTheFirstOneIgnoresGrpcPort(t *testing.T) { +func TestPeerIndexIgnoresGrpcPort(t *testing.T) { self := pb.ServerAddress("127.0.0.1:9000.19000") peers := []pb.ServerAddress{ "127.0.0.1:9000", @@ -14,8 +14,8 @@ func TestIsTheFirstOneIgnoresGrpcPort(t *testing.T) { "127.0.0.1:9003.19003", } - if !isTheFirstOne(self, peers) { - t.Fatalf("expected first peer match by HTTP address between %q and %+v", self, peers) + if idx := peerIndex(self, peers); idx != 0 { + t.Fatalf("expected peer index 0 for %q among %+v, got %d", self, peers, idx) } } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index b65869107..f403ff771 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -297,11 +297,28 @@ func (ms *MasterServer) ensureTopologyId() { currentId := ms.Topo.GetTopologyId() glog.V(1).Infof("ensureTopologyId: current TopologyId after barrier: %s", currentId) + prevId := ms.Topo.GetTopologyId() + EnsureTopologyId(ms.Topo, func() bool { return ms.Topo.IsLeader() }, func(topologyId string) error { return ms.syncRaftForTopologyId(topologyId) }) + + // If a new TopologyId was generated, take a snapshot so it survives + // raft state cleanup on future non-resume restarts. + if prevId == "" && ms.Topo.GetTopologyId() != "" { + ms.Topo.RaftServerAccessLock.RLock() + if ms.Topo.RaftServer != nil { + if err := ms.Topo.RaftServer.TakeSnapshot(); err != nil { + glog.Warningf("snapshot after TopologyId generation: %v", err) + } else { + glog.V(0).Infof("snapshot taken to persist TopologyId %s", ms.Topo.GetTopologyId()) + } + } + // Hashicorp raft snapshots are handled automatically. + ms.Topo.RaftServerAccessLock.RUnlock() + } } func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc { diff --git a/weed/server/raft_common.go b/weed/server/raft_common.go index 09aeaaac0..9a87a9952 100644 --- a/weed/server/raft_common.go +++ b/weed/server/raft_common.go @@ -1,6 +1,7 @@ package weed_server import ( + "encoding/json" "time" "github.com/google/uuid" @@ -8,6 +9,23 @@ import ( "github.com/seaweedfs/seaweedfs/weed/topology" ) +// recoverTopologyIdFromState restores the TopologyId from serialized FSM +// state bytes (JSON-encoded MaxVolumeIdCommand). Both raft implementations +// call this after reading their snapshot in their own format. +func recoverTopologyIdFromState(fsmState []byte, topo *topology.Topology) { + if topo.GetTopologyId() != "" { + return + } + var cmd topology.MaxVolumeIdCommand + if err := json.Unmarshal(fsmState, &cmd); err != nil { + return + } + if cmd.TopologyId != "" { + topo.SetTopologyId(cmd.TopologyId) + glog.V(0).Infof("Recovered TopologyId from snapshot: %s", cmd.TopologyId) + } +} + // EnsureTopologyId ensures that a TopologyId is generated and persisted if it's currently missing. // It uses the provided checkLeaderFn to verify leadership and persistFn to save the new ID. func EnsureTopologyId(topo *topology.Topology, checkLeaderFn func() bool, persistFn func(string) error) { diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go index 99d31b1d1..575478aec 100644 --- a/weed/server/raft_hashicorp.go +++ b/weed/server/raft_hashicorp.go @@ -6,6 +6,7 @@ package weed_server import ( "encoding/json" "fmt" + "io" "math/rand/v2" "os" "path" @@ -56,6 +57,28 @@ func raftServerID(server pb.ServerAddress) string { return server.ToHttpAddress() } +// recoverTopologyIdFromHashicorpSnapshot reads the TopologyId from the latest +// hashicorp raft snapshot before state cleanup. +func recoverTopologyIdFromHashicorpSnapshot(dataDir string, topo *topology.Topology) { + fss, err := raft.NewFileSnapshotStore(dataDir, 1, io.Discard) + if err != nil { + return + } + snapshots, err := fss.List() + if err != nil || len(snapshots) == 0 { + return + } + _, rc, err := fss.Open(snapshots[0].ID) + if err != nil { + return + } + defer rc.Close() + + if b, err := io.ReadAll(rc); err == nil { + recoverTopologyIdFromState(b, topo) + } +} + func (s *RaftServer) AddPeersConfiguration() (cfg raft.Configuration) { for _, peer := range s.peers { cfg.Servers = append(cfg.Servers, raft.Server{ @@ -168,6 +191,8 @@ func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) { } if option.RaftBootstrap { + recoverTopologyIdFromHashicorpSnapshot(s.dataDir, option.Topo) + os.RemoveAll(path.Join(s.dataDir, ldbFile)) os.RemoveAll(path.Join(s.dataDir, sdbFile)) os.RemoveAll(path.Join(s.dataDir, "snapshots")) diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index ce35f6562..938219325 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -2,10 +2,13 @@ package weed_server import ( "encoding/json" + "fmt" + "hash/crc32" "io" "math/rand/v2" "os" "path" + "sort" "sync" "time" @@ -142,11 +145,20 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { glog.V(0).Infof("Starting RaftServer with %v", option.ServerAddr) if !option.RaftResumeState { + // Recover the TopologyId from the snapshot before clearing state. + // The TopologyId is a cluster identity used for license validation + // and must survive raft state cleanup. + recoverTopologyIdFromSnapshot(s.dataDir, option.Topo) + // clear previous log to ensure fresh start os.RemoveAll(path.Join(s.dataDir, "log")) // always clear previous metadata os.RemoveAll(path.Join(s.dataDir, "conf")) os.RemoveAll(path.Join(s.dataDir, "snapshot")) + // clear persisted vote state (currentTerm/votedFor) so that stale + // terms from previous runs cannot cause election conflicts when the + // log has been wiped. + os.Remove(path.Join(s.dataDir, "state")) } if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), os.ModePerm); err != nil { return nil, err @@ -208,6 +220,47 @@ func (s *RaftServer) Peers() (members []string) { return } +// recoverTopologyIdFromSnapshot reads the TopologyId from the latest +// seaweedfs/raft snapshot before state cleanup. +func recoverTopologyIdFromSnapshot(dataDir string, topo *topology.Topology) { + snapshotDir := path.Join(dataDir, "snapshot") + dir, err := os.Open(snapshotDir) + if err != nil { + return + } + defer dir.Close() + filenames, err := dir.Readdirnames(-1) + if err != nil || len(filenames) == 0 { + return + } + + sort.Strings(filenames) + file, err := os.Open(path.Join(snapshotDir, filenames[len(filenames)-1])) + if err != nil { + return + } + defer file.Close() + + // Snapshot format: 8-hex-digit CRC32 checksum, newline, JSON body. + var checksum uint32 + if _, err := fmt.Fscanf(file, "%08x\n", &checksum); err != nil { + return + } + b, err := io.ReadAll(file) + if err != nil || crc32.ChecksumIEEE(b) != checksum { + return + } + + // The snapshot JSON wraps the FSM state in a "state" field. + var snap struct { + State json.RawMessage `json:"state"` + } + if err := json.Unmarshal(b, &snap); err != nil || len(snap.State) == 0 { + return + } + recoverTopologyIdFromState(snap.State, topo) +} + func (s *RaftServer) DoJoinCommand() { glog.V(0).Infoln("Initializing new cluster") diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index f3ccaf2e2..9f705bbcc 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -183,16 +183,29 @@ func (mc *MasterClient) SetOnPeerUpdateFn(onPeerUpdate func(update *master_pb.Cl func (mc *MasterClient) tryAllMasters(ctx context.Context) { var nextHintedLeader pb.ServerAddress + failedMasters := make(map[pb.ServerAddress]struct{}) mc.masters.RefreshBySrvIfAvailable() for _, master := range mc.masters.GetInstances() { + if _, failed := failedMasters[master]; failed { + continue + } nextHintedLeader = mc.tryConnectToMaster(ctx, master) for nextHintedLeader != "" { + if _, failed := failedMasters[nextHintedLeader]; failed { + break // don't follow redirect to a known-unreachable master + } select { case <-ctx.Done(): glog.V(0).Infof("Connection attempt to all masters stopped: %v", ctx.Err()) return default: - nextHintedLeader = mc.tryConnectToMaster(ctx, nextHintedLeader) + target := nextHintedLeader + nextHintedLeader = mc.tryConnectToMaster(ctx, target) + if nextHintedLeader == "" { + // connection to target failed; remember it so we skip + // stale redirects pointing back to it this cycle + failedMasters[target] = struct{}{} + } } } mc.setCurrentMaster("")