diff --git a/weed/server/master_server.go b/weed/server/master_server.go index a95f4bcb6..c4ddc7aa2 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -2,6 +2,7 @@ package weed_server import ( "context" + "encoding/json" "fmt" "net/http" "net/http/httputil" @@ -40,6 +41,7 @@ import ( const ( SequencerType = "master.sequencer.type" SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" + raftApplyTimeout = 1 * time.Second ) type MasterOption struct { @@ -227,6 +229,7 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { if ms.Topo.IsLeader() { glog.V(0).Infof("%s I am the leader!", raftServerName) + go ms.ensureTopologyId() } else { var raftServerLeader string ms.Topo.RaftServerAccessLock.RLock() @@ -242,6 +245,26 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { } } +func (ms *MasterServer) syncRaftForTopologyId(topologyId string) error { + ms.Topo.RaftServerAccessLock.RLock() + defer ms.Topo.RaftServerAccessLock.RUnlock() + + if ms.Topo.RaftServer != nil { + _, err := ms.Topo.RaftServer.Do(topology.NewMaxVolumeIdCommand(ms.Topo.GetMaxVolumeId(), topologyId)) + return err + } else if ms.Topo.HashicorpRaft != nil { + b, err := json.Marshal(topology.NewMaxVolumeIdCommand(ms.Topo.GetMaxVolumeId(), topologyId)) + if err != nil { + return fmt.Errorf("failed marshal NewMaxVolumeIdCommand: %v", err) + } + if future := ms.Topo.HashicorpRaft.Apply(b, raftApplyTimeout); future.Error() != nil { + return future.Error() + } + return nil + } + return fmt.Errorf("no raft server configured") +} + func (ms *MasterServer) ensureTopologyId() { ms.topologyIdGenLock.Lock() defer ms.topologyIdGenLock.Unlock() @@ -254,7 +277,7 @@ func (ms *MasterServer) ensureTopologyId() { glog.V(1).Infof("lost leadership while sending barrier command for topologyId") return } - if _, err := ms.Topo.RaftServer.Do(topology.NewMaxVolumeIdCommand(ms.Topo.GetMaxVolumeId(), ms.Topo.GetTopologyId())); err != nil { + if err := ms.syncRaftForTopologyId(ms.Topo.GetTopologyId()); err != nil { glog.Errorf("failed to sync raft for topologyId: %v, retrying in 1s", err) time.Sleep(time.Second) continue @@ -273,8 +296,7 @@ func (ms *MasterServer) ensureTopologyId() { EnsureTopologyId(ms.Topo, func() bool { return ms.Topo.IsLeader() }, func(topologyId string) error { - _, err := ms.Topo.RaftServer.Do(topology.NewMaxVolumeIdCommand(ms.Topo.GetMaxVolumeId(), topologyId)) - return err + return ms.syncRaftForTopologyId(topologyId) }) } diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index b2f1aaf17..322a41608 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -51,6 +51,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) m["Version"] = version.Version() + m["TopologyId"] = ms.Topo.GetTopologyId() m["Topology"] = ms.Topo.ToInfo() writeJsonQuiet(w, r, http.StatusOK, m) }