Browse Source

fix some data race problem detect by go race

pull/279/head
tnextday 10 years ago
parent
commit
37cacb06d0
  1. 2
      Makefile
  2. 8
      go/topology/data_center.go
  3. 33
      go/topology/data_node.go
  4. 6
      go/topology/node.go
  5. 12
      go/topology/rack.go
  6. 32
      go/topology/topology.go
  7. 4
      go/topology/volume_location_list.go
  8. 22
      go/weed/weed_server/master_server.go
  9. 2
      go/weed/weed_server/master_server_handlers_admin.go
  10. 4
      go/weed/weed_server/master_server_handlers_ui.go

2
Makefile

@ -1,6 +1,6 @@
BINARY = weed BINARY = weed
GO_FLAGS = #-v
GO_FLAGS = -race #-v
SOURCE_DIR = ./go/weed/ SOURCE_DIR = ./go/weed/
all: build all: build

8
go/topology/data_center.go

@ -14,11 +14,11 @@ func NewDataCenter(id string) *DataCenter {
} }
func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack { func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack {
rack := dc.GetChildren(NodeId(rackName)).(*Rack)
if rack != nil {
return rack
n := dc.GetChildren(NodeId(rackName))
if n != nil {
return n.(*Rack)
} }
rack = NewRack(rackName)
rack := NewRack(rackName)
dc.LinkChildNode(rack) dc.LinkChildNode(rack)
return rack return rack
} }

33
go/topology/data_node.go

@ -7,16 +7,17 @@ import (
"github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/storage" "github.com/chrislusf/seaweedfs/go/storage"
"net" "net"
"time"
) )
type DataNode struct { type DataNode struct {
NodeImpl NodeImpl
volumes map[storage.VolumeId]*storage.VolumeInfo volumes map[storage.VolumeId]*storage.VolumeInfo
lastSeen int64 // unix time in seconds
dead bool
Ip string Ip string
Port int Port int
PublicUrl string PublicUrl string
LastSeen int64 // unix time in seconds
Dead bool
} }
func NewDataNode(id string) *DataNode { func NewDataNode(id string) *DataNode {
@ -29,7 +30,33 @@ func NewDataNode(id string) *DataNode {
} }
func (dn *DataNode) String() string { func (dn *DataNode) String() string {
return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead)
dn.mutex.RLock()
defer dn.mutex.RUnlock()
return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.dead)
}
func (dn *DataNode) LastSeen() int64 {
dn.mutex.RLock()
defer dn.mutex.RUnlock()
return dn.lastSeen
}
func (dn *DataNode) UpdateLastSeen() {
dn.mutex.Lock()
defer dn.mutex.Unlock()
dn.lastSeen = time.Now().Unix()
}
func (dn *DataNode) IsDead() bool {
dn.mutex.RLock()
defer dn.mutex.RUnlock()
return dn.dead
}
func (dn *DataNode) SetDead(b bool) {
dn.mutex.Lock()
defer dn.mutex.Unlock()
dn.dead = b
} }
func (dn *DataNode) AddOrUpdateVolume(v *storage.VolumeInfo) { func (dn *DataNode) AddOrUpdateVolume(v *storage.VolumeInfo) {

6
go/topology/node.go

@ -326,9 +326,9 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi
if n.IsRack() { if n.IsRack() {
for _, c := range n.Children() { for _, c := range n.Children() {
dn := c.(*DataNode) //can not cast n to DataNode dn := c.(*DataNode) //can not cast n to DataNode
if dn.LastSeen < freshThreshHold {
if !dn.Dead {
dn.Dead = true
if dn.LastSeen() < freshThreshHold {
if !dn.IsDead() {
dn.SetDead(true)
n.GetTopology().chanDeadDataNodes <- dn n.GetTopology().chanDeadDataNodes <- dn
} }
} }

12
go/topology/rack.go

@ -3,7 +3,6 @@ package topology
import ( import (
"net" "net"
"strconv" "strconv"
"time"
) )
type Rack struct { type Rack struct {
@ -24,14 +23,17 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode {
dn := c.(*DataNode) dn := c.(*DataNode)
return dn.MatchLocation(ip, port) return dn.MatchLocation(ip, port)
}) })
if n == nil {
return nil
}
return n.(*DataNode) return n.(*DataNode)
} }
func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode { func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode {
if dn := r.FindDataNode(ip, port); dn != nil { if dn := r.FindDataNode(ip, port); dn != nil {
dn.LastSeen = time.Now().Unix()
if dn.Dead {
dn.Dead = false
dn.UpdateLastSeen()
if dn.IsDead() {
dn.SetDead(false)
r.GetTopology().chanRecoveredDataNodes <- dn r.GetTopology().chanRecoveredDataNodes <- dn
dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount) dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount)
} }
@ -48,7 +50,7 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol
} }
dn.PublicUrl = publicUrl dn.PublicUrl = publicUrl
dn.maxVolumeCount = maxVolumeCount dn.maxVolumeCount = maxVolumeCount
dn.LastSeen = time.Now().Unix()
dn.UpdateLastSeen()
r.LinkChildNode(dn) r.LinkChildNode(dn)
return dn return dn
} }

32
go/topology/topology.go

@ -22,7 +22,7 @@ type Topology struct {
Sequence sequence.Sequencer Sequence sequence.Sequencer
CollectionSettings *storage.CollectionSettings CollectionSettings *storage.CollectionSettings
configuration *Configuration configuration *Configuration
RaftServer raft.Server
raftServer raft.Server
chanDeadDataNodes chan *DataNode chanDeadDataNodes chan *DataNode
chanRecoveredDataNodes chan *DataNode chanRecoveredDataNodes chan *DataNode
@ -51,24 +51,36 @@ func NewTopology(id string, confFile string, cs *storage.CollectionSettings, seq
return t, err return t, err
} }
func (t *Topology) GetRaftServer() raft.Server {
t.mutex.RLock()
defer t.mutex.RUnlock()
return t.raftServer
}
func (t *Topology) SetRaftServer(s raft.Server) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.raftServer = s
}
func (t *Topology) IsLeader() bool { func (t *Topology) IsLeader() bool {
if leader, e := t.Leader(); e == nil { if leader, e := t.Leader(); e == nil {
return leader == t.RaftServer.Name()
return leader == t.GetRaftServer().Name()
} }
return false return false
} }
func (t *Topology) Leader() (string, error) { func (t *Topology) Leader() (string, error) {
l := "" l := ""
if t.RaftServer != nil {
l = t.RaftServer.Leader()
if t.GetRaftServer() != nil {
l = t.GetRaftServer().Leader()
} else { } else {
return "", errors.New("Raft Server not ready yet!") return "", errors.New("Raft Server not ready yet!")
} }
if l == "" { if l == "" {
// We are a single node cluster, we are the leader // We are a single node cluster, we are the leader
return t.RaftServer.Name(), errors.New("Raft Server not initialized!")
return t.GetRaftServer().Name(), errors.New("Raft Server not initialized!")
} }
return l, nil return l, nil
@ -106,7 +118,7 @@ func (t *Topology) Lookup(collection string, vid storage.VolumeId) (vl *VolumeLo
func (t *Topology) NextVolumeId() storage.VolumeId { func (t *Topology) NextVolumeId() storage.VolumeId {
vid := t.GetMaxVolumeId() vid := t.GetMaxVolumeId()
next := vid.Next() next := vid.Next()
go t.RaftServer.Do(NewMaxVolumeIdCommand(next))
go t.GetRaftServer().Do(NewMaxVolumeIdCommand(next))
return next return next
} }
@ -179,11 +191,11 @@ func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
} }
func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
dc := t.GetChildren(NodeId(dcName)).(*DataCenter)
if dc != nil {
return dc
n := t.GetChildren(NodeId(dcName))
if n != nil {
return n.(*DataCenter)
} }
dc = NewDataCenter(dcName)
dc := NewDataCenter(dcName)
t.LinkChildNode(dc) t.LinkChildNode(dc)
return dc return dc
} }

4
go/topology/volume_location_list.go

@ -65,7 +65,7 @@ func (dnll *VolumeLocationList) Remove(loc *DataNode) bool {
func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) {
var changed bool var changed bool
for _, dnl := range dnll.list { for _, dnl := range dnll.list {
if dnl.LastSeen < freshThreshHold {
if dnl.LastSeen() < freshThreshHold {
changed = true changed = true
break break
} }
@ -73,7 +73,7 @@ func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) {
if changed { if changed {
var l []*DataNode var l []*DataNode
for _, dnl := range dnll.list { for _, dnl := range dnll.list {
if dnl.LastSeen >= freshThreshHold {
if dnl.LastSeen() >= freshThreshHold {
l = append(l, dnl) l = append(l, dnl)
} }
} }

22
go/weed/weed_server/master_server.go

@ -86,17 +86,17 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
} }
func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
ms.Topo.RaftServer = raftServer.raftServer
ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
if ms.Topo.RaftServer.Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
ms.Topo.SetRaftServer(raftServer.raftServer)
ms.Topo.GetRaftServer().AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
if ms.Topo.GetRaftServer().Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.GetRaftServer().Name(), "]", ms.Topo.GetRaftServer().Leader(), "becomes leader.")
} }
}) })
if ms.Topo.IsLeader() { if ms.Topo.IsLeader() {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!")
glog.V(0).Infoln("[", ms.Topo.GetRaftServer().Name(), "]", "I am the leader!")
} else { } else {
if ms.Topo.RaftServer.Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.")
if ms.Topo.GetRaftServer().Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.GetRaftServer().Name(), "]", ms.Topo.GetRaftServer().Leader(), "is the leader.")
} }
} }
} }
@ -105,16 +105,16 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() { if ms.Topo.IsLeader() {
f(w, r) f(w, r)
} else if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
} else if ms.Topo.GetRaftServer() != nil && ms.Topo.GetRaftServer().Leader() != "" {
ms.bounedLeaderChan <- 1 ms.bounedLeaderChan <- 1
defer func() { <-ms.bounedLeaderChan }() defer func() { <-ms.bounedLeaderChan }()
targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader())
targetUrl, err := url.Parse("http://" + ms.Topo.GetRaftServer().Leader())
if err != nil { if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, writeJsonError(w, r, http.StatusInternalServerError,
fmt.Errorf("Leader URL http://%s Parse Error: %v", ms.Topo.RaftServer.Leader(), err))
fmt.Errorf("Leader URL http://%s Parse Error: %v", ms.Topo.GetRaftServer().Leader(), err))
return return
} }
glog.V(4).Infoln("proxying to leader", ms.Topo.RaftServer.Leader())
glog.V(4).Infoln("proxying to leader", ms.Topo.GetRaftServer().Leader())
proxy := httputil.NewSingleHostReverseProxy(targetUrl) proxy := httputil.NewSingleHostReverseProxy(targetUrl)
director := proxy.Director director := proxy.Director
proxy.Director = func(req *http.Request) { proxy.Director = func(req *http.Request) {

2
go/weed/weed_server/master_server_handlers_admin.go

@ -171,7 +171,7 @@ func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r *
if ms.Topo.IsLeader() { if ms.Topo.IsLeader() {
deleteForClientHandler(w, r, ms.selfUrl(r)) deleteForClientHandler(w, r, ms.selfUrl(r))
} else { } else {
deleteForClientHandler(w, r, ms.Topo.RaftServer.Leader())
deleteForClientHandler(w, r, ms.Topo.GetRaftServer().Leader())
} }
} }

4
go/weed/weed_server/master_server_handlers_ui.go

@ -21,8 +21,8 @@ func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request)
}{ }{
util.VERSION, util.VERSION,
ms.Topo.ToMap(), ms.Topo.ToMap(),
ms.Topo.RaftServer.Leader(),
ms.Topo.RaftServer.Peers(),
ms.Topo.GetRaftServer().Leader(),
ms.Topo.GetRaftServer().Peers(),
infos, infos,
serverStats, serverStats,
} }

Loading…
Cancel
Save