|
|
|
@ -2,6 +2,7 @@ package weed_server |
|
|
|
|
|
|
|
import ( |
|
|
|
"context" |
|
|
|
"encoding/json" |
|
|
|
"fmt" |
|
|
|
"net/http" |
|
|
|
"net/http/httputil" |
|
|
|
@ -40,6 +41,7 @@ import ( |
|
|
|
const ( |
|
|
|
SequencerType = "master.sequencer.type" |
|
|
|
SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" |
|
|
|
raftApplyTimeout = 1 * time.Second |
|
|
|
) |
|
|
|
|
|
|
|
type MasterOption struct { |
|
|
|
@ -227,6 +229,7 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { |
|
|
|
|
|
|
|
if ms.Topo.IsLeader() { |
|
|
|
glog.V(0).Infof("%s I am the leader!", raftServerName) |
|
|
|
go ms.ensureTopologyId() |
|
|
|
} else { |
|
|
|
var raftServerLeader string |
|
|
|
ms.Topo.RaftServerAccessLock.RLock() |
|
|
|
@ -242,6 +245,26 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (ms *MasterServer) syncRaftForTopologyId(topologyId string) error { |
|
|
|
ms.Topo.RaftServerAccessLock.RLock() |
|
|
|
defer ms.Topo.RaftServerAccessLock.RUnlock() |
|
|
|
|
|
|
|
if ms.Topo.RaftServer != nil { |
|
|
|
_, err := ms.Topo.RaftServer.Do(topology.NewMaxVolumeIdCommand(ms.Topo.GetMaxVolumeId(), topologyId)) |
|
|
|
return err |
|
|
|
} else if ms.Topo.HashicorpRaft != nil { |
|
|
|
b, err := json.Marshal(topology.NewMaxVolumeIdCommand(ms.Topo.GetMaxVolumeId(), topologyId)) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("failed marshal NewMaxVolumeIdCommand: %v", err) |
|
|
|
} |
|
|
|
if future := ms.Topo.HashicorpRaft.Apply(b, raftApplyTimeout); future.Error() != nil { |
|
|
|
return future.Error() |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
return fmt.Errorf("no raft server configured") |
|
|
|
} |
|
|
|
|
|
|
|
func (ms *MasterServer) ensureTopologyId() { |
|
|
|
ms.topologyIdGenLock.Lock() |
|
|
|
defer ms.topologyIdGenLock.Unlock() |
|
|
|
@ -254,7 +277,7 @@ func (ms *MasterServer) ensureTopologyId() { |
|
|
|
glog.V(1).Infof("lost leadership while sending barrier command for topologyId") |
|
|
|
return |
|
|
|
} |
|
|
|
if _, err := ms.Topo.RaftServer.Do(topology.NewMaxVolumeIdCommand(ms.Topo.GetMaxVolumeId(), ms.Topo.GetTopologyId())); err != nil { |
|
|
|
if err := ms.syncRaftForTopologyId(ms.Topo.GetTopologyId()); err != nil { |
|
|
|
glog.Errorf("failed to sync raft for topologyId: %v, retrying in 1s", err) |
|
|
|
time.Sleep(time.Second) |
|
|
|
continue |
|
|
|
@ -273,8 +296,7 @@ func (ms *MasterServer) ensureTopologyId() { |
|
|
|
EnsureTopologyId(ms.Topo, func() bool { |
|
|
|
return ms.Topo.IsLeader() |
|
|
|
}, func(topologyId string) error { |
|
|
|
_, err := ms.Topo.RaftServer.Do(topology.NewMaxVolumeIdCommand(ms.Topo.GetMaxVolumeId(), topologyId)) |
|
|
|
return err |
|
|
|
return ms.syncRaftForTopologyId(topologyId) |
|
|
|
}) |
|
|
|
} |
|
|
|
|
|
|
|
|