From a55cb02e12bd0212944d700e70679e87ecc5f795 Mon Sep 17 00:00:00 2001 From: tnextday Date: Sat, 12 Mar 2016 21:23:19 +0800 Subject: [PATCH] fix some data race problem detect by go race --- go/storage/needle_map.go | 93 ++++++++++++++++++-------------- go/storage/needle_map_boltdb.go | 2 +- go/storage/needle_map_leveldb.go | 2 +- go/storage/needle_map_memory.go | 22 ++++---- go/storage/store.go | 44 ++++++++++++--- 5 files changed, 102 insertions(+), 61 deletions(-) diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go index 814789616..d594dd348 100644 --- a/go/storage/needle_map.go +++ b/go/storage/needle_map.go @@ -35,12 +35,17 @@ type NeedleMapper interface { type baseNeedleMapper struct { indexFile *os.File - indexFileAccessLock sync.Mutex - - mapMetric + mutex sync.RWMutex + deletionCounter int `json:"DeletionCounter"` + fileCounter int `json:"FileCounter"` + deletionByteCounter uint64 `json:"DeletionByteCounter"` + fileByteCounter uint64 `json:"FileByteCounter"` + maximumFileKey uint64 `json:"MaxFileKey"` } -func (nm baseNeedleMapper) IndexFileSize() uint64 { +func (nm *baseNeedleMapper) IndexFileSize() uint64 { + nm.mutex.RLock() + defer nm.mutex.RUnlock() stat, err := nm.indexFile.Stat() if err == nil { return uint64(stat.Size()) @@ -48,7 +53,9 @@ func (nm baseNeedleMapper) IndexFileSize() uint64 { return 0 } -func (nm baseNeedleMapper) IndexFileName() string { +func (nm *baseNeedleMapper) IndexFileName() string { + nm.mutex.RLock() + defer nm.mutex.RUnlock() return nm.indexFile.Name() } @@ -58,14 +65,14 @@ func idxFileEntry(bytes []byte) (key uint64, offset uint32, size uint32) { size = util.BytesToUint32(bytes[12:16]) return } -func (nm baseNeedleMapper) appendToIndexFile(key uint64, offset uint32, size uint32) error { +func (nm *baseNeedleMapper) appendToIndexFile(key uint64, offset uint32, size uint32) error { bytes := make([]byte, 16) util.Uint64toBytes(bytes[0:8], key) util.Uint32toBytes(bytes[8:12], offset) util.Uint32toBytes(bytes[12:16], size) - nm.indexFileAccessLock.Lock() - defer nm.indexFileAccessLock.Unlock() + nm.mutex.Lock() + defer nm.mutex.Unlock() if _, err := nm.indexFile.Seek(0, 2); err != nil { return fmt.Errorf("cannot seek end of indexfile %s: %v", nm.indexFile.Name(), err) @@ -73,51 +80,55 @@ func (nm baseNeedleMapper) appendToIndexFile(key uint64, offset uint32, size uin _, err := nm.indexFile.Write(bytes) return err } -func (nm baseNeedleMapper) IndexFileContent() ([]byte, error) { - nm.indexFileAccessLock.Lock() - defer nm.indexFileAccessLock.Unlock() +func (nm *baseNeedleMapper) IndexFileContent() ([]byte, error) { + nm.mutex.RLock() + defer nm.mutex.RUnlock() return ioutil.ReadFile(nm.indexFile.Name()) } -type mapMetric struct { - indexFile *os.File - - DeletionCounter int `json:"DeletionCounter"` - FileCounter int `json:"FileCounter"` - DeletionByteCounter uint64 `json:"DeletionByteCounter"` - FileByteCounter uint64 `json:"FileByteCounter"` - MaximumFileKey uint64 `json:"MaxFileKey"` -} - -func (mm *mapMetric) logDelete(deletedByteCount uint32) { - mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(deletedByteCount) - mm.DeletionCounter++ +func (nm *baseNeedleMapper) logDelete(deletedByteCount uint32) { + nm.mutex.Lock() + defer nm.mutex.Unlock() + nm.deletionByteCounter = nm.deletionByteCounter + uint64(deletedByteCount) + nm.deletionCounter++ } -func (mm *mapMetric) logPut(key uint64, oldSize uint32, newSize uint32) { - if key > mm.MaximumFileKey { - mm.MaximumFileKey = key +func (nm *baseNeedleMapper) logPut(key uint64, oldSize uint32, newSize uint32) { + nm.mutex.Lock() + defer nm.mutex.Unlock() + if key > nm.maximumFileKey { + nm.maximumFileKey = key } - mm.FileCounter++ - mm.FileByteCounter = mm.FileByteCounter + uint64(newSize) + nm.fileCounter++ + nm.fileByteCounter = nm.fileByteCounter + uint64(newSize) if oldSize > 0 { - mm.DeletionCounter++ - mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(oldSize) + nm.deletionCounter++ + nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) } } -func (mm mapMetric) ContentSize() uint64 { - return mm.FileByteCounter +func (nm *baseNeedleMapper) ContentSize() uint64 { + nm.mutex.RLock() + defer nm.mutex.RUnlock() + return nm.fileByteCounter } -func (mm mapMetric) DeletedSize() uint64 { - return mm.DeletionByteCounter +func (nm *baseNeedleMapper) DeletedSize() uint64 { + nm.mutex.RLock() + defer nm.mutex.RUnlock() + return nm.deletionByteCounter } -func (mm mapMetric) FileCount() int { - return mm.FileCounter +func (nm *baseNeedleMapper) FileCount() int { + nm.mutex.RLock() + defer nm.mutex.RUnlock() + return nm.fileCounter } -func (mm mapMetric) DeletedCount() int { - return mm.DeletionCounter +func (nm *baseNeedleMapper) DeletedCount() int { + nm.mutex.RLock() + defer nm.mutex.RUnlock() + return nm.deletionCounter } -func (mm mapMetric) MaxFileKey() uint64 { - return mm.MaximumFileKey +func (nm *baseNeedleMapper) MaxFileKey() uint64 { + nm.mutex.RLock() + defer nm.mutex.RUnlock() + return nm.maximumFileKey } diff --git a/go/storage/needle_map_boltdb.go b/go/storage/needle_map_boltdb.go index e95c016bb..a08e809aa 100644 --- a/go/storage/needle_map_boltdb.go +++ b/go/storage/needle_map_boltdb.go @@ -35,7 +35,7 @@ func NewBoltDbNeedleMap(dbFileName string, indexFile *os.File) (m *BoltDbNeedleM if indexLoadError != nil { return nil, indexLoadError } - m.mapMetric = nm.mapMetric + m.baseNeedleMapper = nm.baseNeedleMapper return } diff --git a/go/storage/needle_map_leveldb.go b/go/storage/needle_map_leveldb.go index 47f63e3ae..2fd17d850 100644 --- a/go/storage/needle_map_leveldb.go +++ b/go/storage/needle_map_leveldb.go @@ -33,7 +33,7 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedl if indexLoadError != nil { return nil, indexLoadError } - m.mapMetric = nm.mapMetric + m.baseNeedleMapper = nm.baseNeedleMapper return } diff --git a/go/storage/needle_map_memory.go b/go/storage/needle_map_memory.go index 2b1fc1b54..fabbcd62d 100644 --- a/go/storage/needle_map_memory.go +++ b/go/storage/needle_map_memory.go @@ -28,27 +28,27 @@ const ( func LoadNeedleMap(file *os.File) (*NeedleMap, error) { nm := NewNeedleMap(file) e := WalkIndexFile(file, func(key uint64, offset, size uint32) error { - if key > nm.MaximumFileKey { - nm.MaximumFileKey = key + if key > nm.maximumFileKey { + nm.maximumFileKey = key } - nm.FileCounter++ - nm.FileByteCounter = nm.FileByteCounter + uint64(size) + nm.fileCounter++ + nm.fileByteCounter = nm.fileByteCounter + uint64(size) if offset > 0 { oldSize := nm.m.Set(Key(key), offset, size) glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) if oldSize > 0 { - nm.DeletionCounter++ - nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) + nm.deletionCounter++ + nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) } } else { oldSize := nm.m.Delete(Key(key)) glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) - nm.DeletionCounter++ - nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) + nm.deletionCounter++ + nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) } return nil }) - glog.V(1).Infoln("max file key:", nm.MaximumFileKey) + glog.V(1).Infoln("max file key:", nm.maximumFileKey) return nm, e } @@ -98,7 +98,9 @@ func (nm *NeedleMap) Delete(key uint64) error { return nm.appendToIndexFile(key, 0, 0) } func (nm *NeedleMap) Close() { - _ = nm.indexFile.Close() + nm.mutex.Lock() + nm.indexFile.Close() + nm.mutex.Unlock() } func (nm *NeedleMap) Destroy() error { nm.Close() diff --git a/go/storage/store.go b/go/storage/store.go index 6d2505ced..b2fcdedc7 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -8,6 +8,8 @@ import ( "strconv" "strings" + "sync" + "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/operation" "github.com/chrislusf/seaweedfs/go/security" @@ -80,10 +82,12 @@ type Store struct { masterNodes *MasterNodes needleMapKind NeedleMapType TaskManager *TaskManager + mutex sync.RWMutex } func (s *Store) String() (str string) { - str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d, masterNodes:%s", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.volumeSizeLimit, s.masterNodes) + str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d, masterNodes:%s", + s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.IsConnected(), s.GetVolumeSizeLimit(), s.masterNodes) return } @@ -225,7 +229,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S if maxFileKey < v.nm.MaxFileKey() { maxFileKey = v.nm.MaxFileKey() } - if !v.expired(s.volumeSizeLimit) { + if !v.expired(s.GetVolumeSizeLimit()) { volumeMessage := &operation.VolumeInformationMessage{ Id: proto.Uint32(uint32(v.Id)), Size: proto.Uint64(uint64(v.Size())), @@ -254,7 +258,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S } joinMessage := &operation.JoinMessage{ - IsInit: proto.Bool(!s.connected), + IsInit: proto.Bool(!s.IsConnected()), Ip: proto.String(s.Ip), Port: proto.Uint32(uint32(s.Port)), PublicUrl: proto.String(s.PublicUrl), @@ -288,9 +292,9 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S s.masterNodes.reset() return masterNode, "", errors.New(ret.Error) } - s.volumeSizeLimit = ret.VolumeSizeLimit + s.SetVolumeSizeLimit(ret.VolumeSizeLimit) secretKey = security.Secret(ret.SecretKey) - s.connected = true + s.SetConnected(true) return } func (s *Store) Close() { @@ -307,10 +311,10 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) { size, err = v.write(n) } else { - err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.volumeSizeLimit, v.ContentSize()) + err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.GetVolumeSizeLimit(), v.ContentSize()) } - if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) { - glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit) + if s.GetVolumeSizeLimit() < v.ContentSize()+3*uint64(size) { + glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.GetVolumeSizeLimit()) if _, _, e := s.SendHeartbeatToMaster(); e != nil { glog.V(0).Infoln("error when reporting size:", e) } @@ -350,3 +354,27 @@ func (s *Store) WalkVolume(walker VolumeWalker) error { } return nil } + +func (s *Store) GetVolumeSizeLimit() uint64 { + s.mutex.RLock() + defer s.mutex.RUnlock() + return s.volumeSizeLimit +} + +func (s *Store) SetVolumeSizeLimit(sz uint64) { + s.mutex.Lock() + defer s.mutex.Unlock() + s.volumeSizeLimit = sz +} + +func (s *Store) IsConnected() bool { + s.mutex.RLock() + defer s.mutex.RUnlock() + return s.connected +} + +func (s *Store) SetConnected(b bool) { + s.mutex.Lock() + defer s.mutex.Unlock() + s.connected = b +}