From ef4c2c0d1e5bb45a63cde703013871daa401d1ef Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 9 Feb 2014 23:37:29 -0800 Subject: [PATCH] cleaner cluster messages --- go/topology/topology.go | 2 ++ go/topology/topology_event_handling.go | 12 ++++++++---- go/weed/weed_server/master_server.go | 11 +++++++++++ go/weed/weed_server/raft_server.go | 4 ++++ 4 files changed, 25 insertions(+), 4 deletions(-) diff --git a/go/topology/topology.go b/go/topology/topology.go index 5b3d29e0b..24b3ab337 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -12,6 +12,8 @@ import ( type Topology struct { NodeImpl + IsLeader bool + collectionMap map[string]*Collection pulse int64 diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go index 7f81d8184..5097e9874 100644 --- a/go/topology/topology_event_handling.go +++ b/go/topology/topology_event_handling.go @@ -10,15 +10,19 @@ import ( func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { go func() { for { - freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval - t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit) + if t.IsLeader { + freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval + t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit) + } time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond) } }() go func(garbageThreshold string) { c := time.Tick(15 * time.Minute) - for _ = range c { - t.Vacuum(garbageThreshold) + if t.IsLeader { + for _ = range c { + t.Vacuum(garbageThreshold) + } } }(garbageThreshold) go func() { diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go index d1f7914f1..738484ff0 100644 --- a/go/weed/weed_server/master_server.go +++ b/go/weed/weed_server/master_server.go @@ -5,6 +5,7 @@ import ( "code.google.com/p/weed-fs/go/replication" "code.google.com/p/weed-fs/go/sequence" "code.google.com/p/weed-fs/go/topology" + "github.com/goraft/raft" "github.com/gorilla/mux" "net/http" "net/http/httputil" @@ -72,6 +73,16 @@ func NewMasterServer(r *mux.Router, version string, port int, metaFolder string, func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { ms.raftServer = raftServer + ms.raftServer.raftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { + ms.topo.IsLeader = ms.IsLeader() + glog.V(0).Infoln("[", ms.raftServer.Name(), "]", ms.raftServer.Leader(), "becomes leader.") + }) + ms.topo.IsLeader = ms.IsLeader() + if ms.topo.IsLeader { + glog.V(0).Infoln("[", ms.raftServer.Name(), "]", "I am the leader!") + } else { + glog.V(0).Infoln("[", ms.raftServer.Name(), "]", ms.raftServer.Leader(), "is the leader.") + } } func (ms *MasterServer) IsLeader() bool { diff --git a/go/weed/weed_server/raft_server.go b/go/weed/weed_server/raft_server.go index a44936413..b481df68d 100644 --- a/go/weed/weed_server/raft_server.go +++ b/go/weed/weed_server/raft_server.go @@ -87,6 +87,10 @@ func NewRaftServer(r *mux.Router, version string, peers []string, httpAddr strin return s } +func (s *RaftServer) Name() string { + return s.raftServer.Name() +} + func (s *RaftServer) IsLeader() bool { return s.Leader() == s.raftServer.Name() }