package multi_master import ( "bufio" "bytes" "encoding/json" "fmt" "io" "net" "net/http" "os" "os/exec" "path/filepath" "runtime" "strconv" "strings" "sync" "testing" "time" ) const ( waitTimeout = 30 * time.Second waitTick = 200 * time.Millisecond ) // masterNode represents a single master process in the cluster. type masterNode struct { port int grpcPort int dataDir string cmd *exec.Cmd logFile string stopped bool } // MasterCluster manages a 3-node master raft cluster for integration tests. type MasterCluster struct { t testing.TB weedBinary string baseDir string logsDir string keepLogs bool nodes [3]*masterNode mu sync.Mutex // peers string shared by all nodes, e.g. "127.0.0.1:9333,127.0.0.1:9334,127.0.0.1:9335" peersStr string } // clusterStatus is the JSON returned by /cluster/status. type clusterStatus struct { IsLeader bool `json:"IsLeader"` Leader string `json:"Leader"` Peers []string `json:"Peers"` } // StartMasterCluster boots a 3-node master raft cluster and waits for a leader. func StartMasterCluster(t testing.TB) *MasterCluster { t.Helper() weedBinary, err := findOrBuildWeedBinary() if err != nil { t.Fatalf("resolve weed binary: %v", err) } keepLogs := os.Getenv("MULTI_MASTER_IT_KEEP_LOGS") == "1" baseDir, err := os.MkdirTemp("", "seaweedfs_multi_master_it_") if err != nil { t.Fatalf("create temp dir: %v", err) } logsDir := filepath.Join(baseDir, "logs") os.MkdirAll(logsDir, 0o755) // Allocate 3 port pairs (http, grpc) atomically to prevent reuse. portPairs, err := allocateMultipleMasterPortPairs(3) if err != nil { t.Fatalf("allocate ports: %v", err) } var nodes [3]*masterNode var peerParts []string for i, pp := range portPairs { dataDir := filepath.Join(baseDir, fmt.Sprintf("m%d", i)) os.MkdirAll(dataDir, 0o755) nodes[i] = &masterNode{ port: pp[0], grpcPort: pp[1], dataDir: dataDir, logFile: filepath.Join(logsDir, fmt.Sprintf("master%d.log", i)), } peerParts = append(peerParts, fmt.Sprintf("127.0.0.1:%d", pp[0])) } mc := &MasterCluster{ t: t, weedBinary: weedBinary, baseDir: baseDir, logsDir: logsDir, keepLogs: keepLogs, nodes: nodes, peersStr: strings.Join(peerParts, ","), } for i := range 3 { mc.StartNode(i) } if err := mc.WaitForLeader(waitTimeout); err != nil { mc.DumpLogs() mc.StopAll() t.Fatalf("cluster did not elect a leader: %v", err) } // Wait for TopologyId to be generated and propagated. This is async // after leader election, and we need it committed before tests can // reliably stop/restart nodes. if err := mc.WaitForTopologyId(waitTimeout); err != nil { mc.DumpLogs() mc.StopAll() t.Fatalf("TopologyId not generated: %v", err) } t.Cleanup(func() { mc.StopAll() }) return mc } // StartNode starts the master process at the given index (0–2). func (mc *MasterCluster) StartNode(i int) { mc.t.Helper() mc.mu.Lock() defer mc.mu.Unlock() n := mc.nodes[i] if n.cmd != nil && !n.stopped { return // already running } logFile, err := os.OpenFile(n.logFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) if err != nil { mc.t.Fatalf("create log for node %d: %v", i, err) } args := []string{ "master", "-ip=127.0.0.1", "-port=" + strconv.Itoa(n.port), "-port.grpc=" + strconv.Itoa(n.grpcPort), "-mdir=" + n.dataDir, "-peers=" + mc.peersStr, "-electionTimeout=3s", "-volumeSizeLimitMB=32", "-defaultReplication=000", } n.cmd = exec.Command(mc.weedBinary, args...) n.cmd.Dir = mc.baseDir n.cmd.Stdout = logFile n.cmd.Stderr = logFile n.stopped = false if err := n.cmd.Start(); err != nil { mc.t.Fatalf("start node %d: %v", i, err) } } // StopNode gracefully stops the master at the given index. func (mc *MasterCluster) StopNode(i int) { mc.mu.Lock() defer mc.mu.Unlock() mc.stopNodeLocked(i) } func (mc *MasterCluster) stopNodeLocked(i int) { n := mc.nodes[i] if n.cmd == nil || n.stopped { return } n.stopped = true _ = n.cmd.Process.Signal(os.Interrupt) done := make(chan error, 1) go func() { done <- n.cmd.Wait() }() select { case <-time.After(10 * time.Second): _ = n.cmd.Process.Kill() <-done case <-done: } } // StopAll stops all running master nodes. func (mc *MasterCluster) StopAll() { mc.mu.Lock() defer mc.mu.Unlock() for i := range 3 { mc.stopNodeLocked(i) } if !mc.keepLogs && !mc.t.Failed() { os.RemoveAll(mc.baseDir) } else if mc.baseDir != "" { mc.t.Logf("multi-master logs kept at %s", mc.baseDir) } } // NodeURL returns the HTTP URL for node i. func (mc *MasterCluster) NodeURL(i int) string { return fmt.Sprintf("http://127.0.0.1:%d", mc.nodes[i].port) } // NodeAddress returns "127.0.0.1:port" for node i. func (mc *MasterCluster) NodeAddress(i int) string { return fmt.Sprintf("127.0.0.1:%d", mc.nodes[i].port) } // NodeGRPCAddress returns "127.0.0.1:grpcPort" for node i. func (mc *MasterCluster) NodeGRPCAddress(i int) string { return fmt.Sprintf("127.0.0.1:%d", mc.nodes[i].grpcPort) } // IsNodeRunning returns true if the node at index i has a live process. func (mc *MasterCluster) IsNodeRunning(i int) bool { mc.mu.Lock() defer mc.mu.Unlock() n := mc.nodes[i] return n.cmd != nil && !n.stopped } // GetClusterStatus fetches /cluster/status from node i. func (mc *MasterCluster) GetClusterStatus(i int) (*clusterStatus, error) { client := &http.Client{Timeout: 2 * time.Second} resp, err := client.Get(mc.NodeURL(i) + "/cluster/status") if err != nil { return nil, err } defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) var cs clusterStatus if err := json.Unmarshal(body, &cs); err != nil { return nil, fmt.Errorf("parse cluster/status: %w (body: %s)", err, string(body)) } return &cs, nil } // GetTopologyId fetches the TopologyId from /dir/status on node i. func (mc *MasterCluster) GetTopologyId(i int) (string, error) { client := &http.Client{Timeout: 2 * time.Second} resp, err := client.Get(mc.NodeURL(i) + "/dir/status") if err != nil { return "", err } defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) var raw map[string]any if err := json.Unmarshal(body, &raw); err != nil { return "", err } if id, ok := raw["TopologyId"].(string); ok { return id, nil } return "", nil } // FindLeader returns the index of the leader node and its address. // Returns -1 if no leader is found. func (mc *MasterCluster) FindLeader() (int, string) { for i := range 3 { if !mc.IsNodeRunning(i) { continue } cs, err := mc.GetClusterStatus(i) if err != nil { continue } if cs.IsLeader { return i, mc.NodeAddress(i) } } return -1, "" } // WaitForLeader polls until a leader is elected or timeout. func (mc *MasterCluster) WaitForLeader(timeout time.Duration) error { deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { if idx, _ := mc.FindLeader(); idx >= 0 { return nil } time.Sleep(waitTick) } return fmt.Errorf("no leader elected within %v", timeout) } // WaitForNewLeader waits for a leader that is different from the given address. func (mc *MasterCluster) WaitForNewLeader(oldLeaderAddr string, timeout time.Duration) (int, string, error) { deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { idx, addr := mc.FindLeader() if idx >= 0 && addr != oldLeaderAddr { return idx, addr, nil } time.Sleep(waitTick) } return -1, "", fmt.Errorf("no new leader (different from %s) within %v", oldLeaderAddr, timeout) } // WaitForTopologyId waits until the leader reports a non-empty TopologyId. func (mc *MasterCluster) WaitForTopologyId(timeout time.Duration) error { deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { if idx, _ := mc.FindLeader(); idx >= 0 { if id, err := mc.GetTopologyId(idx); err == nil && id != "" { return nil } } time.Sleep(waitTick) } return fmt.Errorf("TopologyId not available within %v", timeout) } // WaitForNodeReady waits for node i to respond to HTTP. func (mc *MasterCluster) WaitForNodeReady(i int, timeout time.Duration) error { client := &http.Client{Timeout: 1 * time.Second} deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { resp, err := client.Get(mc.NodeURL(i) + "/cluster/status") if err == nil { resp.Body.Close() return nil } time.Sleep(waitTick) } return fmt.Errorf("node %d not ready within %v", i, timeout) } // DumpLogs prints the tail of all master logs. func (mc *MasterCluster) DumpLogs() { for i := range 3 { mc.t.Logf("=== master%d log tail ===\n%s", i, mc.tailLog(i)) } } func (mc *MasterCluster) tailLog(i int) string { f, err := os.Open(mc.nodes[i].logFile) if err != nil { return "(no log)" } defer f.Close() scanner := bufio.NewScanner(f) lines := make([]string, 0, 50) for scanner.Scan() { lines = append(lines, scanner.Text()) if len(lines) > 50 { lines = lines[1:] } } return strings.Join(lines, "\n") } // --- port and binary helpers (adapted from test/volume_server/framework) --- // allocateMultipleMasterPortPairs finds n non-overlapping (http, grpc) port // pairs, holding all listeners until all are found, then releasing them // together to avoid races between consecutive allocations. func allocateMultipleMasterPortPairs(n int) ([][2]int, error) { var listeners []net.Listener var pairs [][2]int defer func() { for _, l := range listeners { l.Close() } }() for masterPort := 10000; masterPort <= 55535 && len(pairs) < n; masterPort++ { grpcPort := masterPort + 10000 l1, err := net.Listen("tcp", net.JoinHostPort("127.0.0.1", strconv.Itoa(masterPort))) if err != nil { continue } l2, err := net.Listen("tcp", net.JoinHostPort("127.0.0.1", strconv.Itoa(grpcPort))) if err != nil { l1.Close() continue } listeners = append(listeners, l1, l2) pairs = append(pairs, [2]int{masterPort, grpcPort}) } if len(pairs) < n { return nil, fmt.Errorf("could only allocate %d of %d master port pairs", len(pairs), n) } return pairs, nil } func findOrBuildWeedBinary() (string, error) { if fromEnv := os.Getenv("WEED_BINARY"); fromEnv != "" { if isExecutableFile(fromEnv) { return fromEnv, nil } return "", fmt.Errorf("WEED_BINARY not executable: %s", fromEnv) } repoRoot := "" if _, file, _, ok := runtime.Caller(0); ok { repoRoot = filepath.Clean(filepath.Join(filepath.Dir(file), "..", "..")) } if repoRoot == "" { return "", fmt.Errorf("unable to detect repository root") } // Check if already built binDir := filepath.Join(os.TempDir(), "seaweedfs_multi_master_it_bin") os.MkdirAll(binDir, 0o755) binPath := filepath.Join(binDir, "weed") if isExecutableFile(binPath) { return binPath, nil } cmd := exec.Command("go", "build", "-o", binPath, ".") cmd.Dir = filepath.Join(repoRoot, "weed") var out bytes.Buffer cmd.Stdout = &out cmd.Stderr = &out if err := cmd.Run(); err != nil { return "", fmt.Errorf("build weed binary: %w\n%s", err, out.String()) } return binPath, nil } func isExecutableFile(path string) bool { info, err := os.Stat(path) if err != nil || info.IsDir() { return false } return info.Mode().Perm()&0o111 != 0 }