Browse Source

cleaner cluster messages

pull/2/head
Chris Lu 11 years ago
parent
commit
ef4c2c0d1e
  1. 2
      go/topology/topology.go
  2. 4
      go/topology/topology_event_handling.go
  3. 11
      go/weed/weed_server/master_server.go
  4. 4
      go/weed/weed_server/raft_server.go

2
go/topology/topology.go

@ -12,6 +12,8 @@ import (
type Topology struct { type Topology struct {
NodeImpl NodeImpl
IsLeader bool
collectionMap map[string]*Collection collectionMap map[string]*Collection
pulse int64 pulse int64

4
go/topology/topology_event_handling.go

@ -10,16 +10,20 @@ import (
func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
go func() { go func() {
for { for {
if t.IsLeader {
freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval
t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit) t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit)
}
time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond) time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond)
} }
}() }()
go func(garbageThreshold string) { go func(garbageThreshold string) {
c := time.Tick(15 * time.Minute) c := time.Tick(15 * time.Minute)
if t.IsLeader {
for _ = range c { for _ = range c {
t.Vacuum(garbageThreshold) t.Vacuum(garbageThreshold)
} }
}
}(garbageThreshold) }(garbageThreshold)
go func() { go func() {
for { for {

11
go/weed/weed_server/master_server.go

@ -5,6 +5,7 @@ import (
"code.google.com/p/weed-fs/go/replication" "code.google.com/p/weed-fs/go/replication"
"code.google.com/p/weed-fs/go/sequence" "code.google.com/p/weed-fs/go/sequence"
"code.google.com/p/weed-fs/go/topology" "code.google.com/p/weed-fs/go/topology"
"github.com/goraft/raft"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"net/http" "net/http"
"net/http/httputil" "net/http/httputil"
@ -72,6 +73,16 @@ func NewMasterServer(r *mux.Router, version string, port int, metaFolder string,
func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
ms.raftServer = raftServer ms.raftServer = raftServer
ms.raftServer.raftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
ms.topo.IsLeader = ms.IsLeader()
glog.V(0).Infoln("[", ms.raftServer.Name(), "]", ms.raftServer.Leader(), "becomes leader.")
})
ms.topo.IsLeader = ms.IsLeader()
if ms.topo.IsLeader {
glog.V(0).Infoln("[", ms.raftServer.Name(), "]", "I am the leader!")
} else {
glog.V(0).Infoln("[", ms.raftServer.Name(), "]", ms.raftServer.Leader(), "is the leader.")
}
} }
func (ms *MasterServer) IsLeader() bool { func (ms *MasterServer) IsLeader() bool {

4
go/weed/weed_server/raft_server.go

@ -87,6 +87,10 @@ func NewRaftServer(r *mux.Router, version string, peers []string, httpAddr strin
return s return s
} }
func (s *RaftServer) Name() string {
return s.raftServer.Name()
}
func (s *RaftServer) IsLeader() bool { func (s *RaftServer) IsLeader() bool {
return s.Leader() == s.raftServer.Name() return s.Leader() == s.raftServer.Name()
} }

Loading…
Cancel
Save