Browse Source

fix some data race problem detect by go race

pull/279/head
tnextday 10 years ago
parent
commit
a55cb02e12
  1. 93
      go/storage/needle_map.go
  2. 2
      go/storage/needle_map_boltdb.go
  3. 2
      go/storage/needle_map_leveldb.go
  4. 22
      go/storage/needle_map_memory.go
  5. 44
      go/storage/store.go

93
go/storage/needle_map.go

@ -35,12 +35,17 @@ type NeedleMapper interface {
type baseNeedleMapper struct {
indexFile *os.File
indexFileAccessLock sync.Mutex
mapMetric
mutex sync.RWMutex
deletionCounter int `json:"DeletionCounter"`
fileCounter int `json:"FileCounter"`
deletionByteCounter uint64 `json:"DeletionByteCounter"`
fileByteCounter uint64 `json:"FileByteCounter"`
maximumFileKey uint64 `json:"MaxFileKey"`
}
func (nm baseNeedleMapper) IndexFileSize() uint64 {
func (nm *baseNeedleMapper) IndexFileSize() uint64 {
nm.mutex.RLock()
defer nm.mutex.RUnlock()
stat, err := nm.indexFile.Stat()
if err == nil {
return uint64(stat.Size())
@ -48,7 +53,9 @@ func (nm baseNeedleMapper) IndexFileSize() uint64 {
return 0
}
func (nm baseNeedleMapper) IndexFileName() string {
func (nm *baseNeedleMapper) IndexFileName() string {
nm.mutex.RLock()
defer nm.mutex.RUnlock()
return nm.indexFile.Name()
}
@ -58,14 +65,14 @@ func idxFileEntry(bytes []byte) (key uint64, offset uint32, size uint32) {
size = util.BytesToUint32(bytes[12:16])
return
}
func (nm baseNeedleMapper) appendToIndexFile(key uint64, offset uint32, size uint32) error {
func (nm *baseNeedleMapper) appendToIndexFile(key uint64, offset uint32, size uint32) error {
bytes := make([]byte, 16)
util.Uint64toBytes(bytes[0:8], key)
util.Uint32toBytes(bytes[8:12], offset)
util.Uint32toBytes(bytes[12:16], size)
nm.indexFileAccessLock.Lock()
defer nm.indexFileAccessLock.Unlock()
nm.mutex.Lock()
defer nm.mutex.Unlock()
if _, err := nm.indexFile.Seek(0, 2); err != nil {
return fmt.Errorf("cannot seek end of indexfile %s: %v",
nm.indexFile.Name(), err)
@ -73,51 +80,55 @@ func (nm baseNeedleMapper) appendToIndexFile(key uint64, offset uint32, size uin
_, err := nm.indexFile.Write(bytes)
return err
}
func (nm baseNeedleMapper) IndexFileContent() ([]byte, error) {
nm.indexFileAccessLock.Lock()
defer nm.indexFileAccessLock.Unlock()
func (nm *baseNeedleMapper) IndexFileContent() ([]byte, error) {
nm.mutex.RLock()
defer nm.mutex.RUnlock()
return ioutil.ReadFile(nm.indexFile.Name())
}
type mapMetric struct {
indexFile *os.File
DeletionCounter int `json:"DeletionCounter"`
FileCounter int `json:"FileCounter"`
DeletionByteCounter uint64 `json:"DeletionByteCounter"`
FileByteCounter uint64 `json:"FileByteCounter"`
MaximumFileKey uint64 `json:"MaxFileKey"`
}
func (mm *mapMetric) logDelete(deletedByteCount uint32) {
mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(deletedByteCount)
mm.DeletionCounter++
func (nm *baseNeedleMapper) logDelete(deletedByteCount uint32) {
nm.mutex.Lock()
defer nm.mutex.Unlock()
nm.deletionByteCounter = nm.deletionByteCounter + uint64(deletedByteCount)
nm.deletionCounter++
}
func (mm *mapMetric) logPut(key uint64, oldSize uint32, newSize uint32) {
if key > mm.MaximumFileKey {
mm.MaximumFileKey = key
func (nm *baseNeedleMapper) logPut(key uint64, oldSize uint32, newSize uint32) {
nm.mutex.Lock()
defer nm.mutex.Unlock()
if key > nm.maximumFileKey {
nm.maximumFileKey = key
}
mm.FileCounter++
mm.FileByteCounter = mm.FileByteCounter + uint64(newSize)
nm.fileCounter++
nm.fileByteCounter = nm.fileByteCounter + uint64(newSize)
if oldSize > 0 {
mm.DeletionCounter++
mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(oldSize)
nm.deletionCounter++
nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
}
}
func (mm mapMetric) ContentSize() uint64 {
return mm.FileByteCounter
func (nm *baseNeedleMapper) ContentSize() uint64 {
nm.mutex.RLock()
defer nm.mutex.RUnlock()
return nm.fileByteCounter
}
func (mm mapMetric) DeletedSize() uint64 {
return mm.DeletionByteCounter
func (nm *baseNeedleMapper) DeletedSize() uint64 {
nm.mutex.RLock()
defer nm.mutex.RUnlock()
return nm.deletionByteCounter
}
func (mm mapMetric) FileCount() int {
return mm.FileCounter
func (nm *baseNeedleMapper) FileCount() int {
nm.mutex.RLock()
defer nm.mutex.RUnlock()
return nm.fileCounter
}
func (mm mapMetric) DeletedCount() int {
return mm.DeletionCounter
func (nm *baseNeedleMapper) DeletedCount() int {
nm.mutex.RLock()
defer nm.mutex.RUnlock()
return nm.deletionCounter
}
func (mm mapMetric) MaxFileKey() uint64 {
return mm.MaximumFileKey
func (nm *baseNeedleMapper) MaxFileKey() uint64 {
nm.mutex.RLock()
defer nm.mutex.RUnlock()
return nm.maximumFileKey
}

2
go/storage/needle_map_boltdb.go

@ -35,7 +35,7 @@ func NewBoltDbNeedleMap(dbFileName string, indexFile *os.File) (m *BoltDbNeedleM
if indexLoadError != nil {
return nil, indexLoadError
}
m.mapMetric = nm.mapMetric
m.baseNeedleMapper = nm.baseNeedleMapper
return
}

2
go/storage/needle_map_leveldb.go

@ -33,7 +33,7 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedl
if indexLoadError != nil {
return nil, indexLoadError
}
m.mapMetric = nm.mapMetric
m.baseNeedleMapper = nm.baseNeedleMapper
return
}

22
go/storage/needle_map_memory.go

@ -28,27 +28,27 @@ const (
func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
nm := NewNeedleMap(file)
e := WalkIndexFile(file, func(key uint64, offset, size uint32) error {
if key > nm.MaximumFileKey {
nm.MaximumFileKey = key
if key > nm.maximumFileKey {
nm.maximumFileKey = key
}
nm.FileCounter++
nm.FileByteCounter = nm.FileByteCounter + uint64(size)
nm.fileCounter++
nm.fileByteCounter = nm.fileByteCounter + uint64(size)
if offset > 0 {
oldSize := nm.m.Set(Key(key), offset, size)
glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
if oldSize > 0 {
nm.DeletionCounter++
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
nm.deletionCounter++
nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
}
} else {
oldSize := nm.m.Delete(Key(key))
glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
nm.DeletionCounter++
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
nm.deletionCounter++
nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
}
return nil
})
glog.V(1).Infoln("max file key:", nm.MaximumFileKey)
glog.V(1).Infoln("max file key:", nm.maximumFileKey)
return nm, e
}
@ -98,7 +98,9 @@ func (nm *NeedleMap) Delete(key uint64) error {
return nm.appendToIndexFile(key, 0, 0)
}
func (nm *NeedleMap) Close() {
_ = nm.indexFile.Close()
nm.mutex.Lock()
nm.indexFile.Close()
nm.mutex.Unlock()
}
func (nm *NeedleMap) Destroy() error {
nm.Close()

44
go/storage/store.go

@ -8,6 +8,8 @@ import (
"strconv"
"strings"
"sync"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/operation"
"github.com/chrislusf/seaweedfs/go/security"
@ -80,10 +82,12 @@ type Store struct {
masterNodes *MasterNodes
needleMapKind NeedleMapType
TaskManager *TaskManager
mutex sync.RWMutex
}
func (s *Store) String() (str string) {
str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d, masterNodes:%s", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.volumeSizeLimit, s.masterNodes)
str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d, masterNodes:%s",
s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.IsConnected(), s.GetVolumeSizeLimit(), s.masterNodes)
return
}
@ -225,7 +229,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
if maxFileKey < v.nm.MaxFileKey() {
maxFileKey = v.nm.MaxFileKey()
}
if !v.expired(s.volumeSizeLimit) {
if !v.expired(s.GetVolumeSizeLimit()) {
volumeMessage := &operation.VolumeInformationMessage{
Id: proto.Uint32(uint32(v.Id)),
Size: proto.Uint64(uint64(v.Size())),
@ -254,7 +258,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
}
joinMessage := &operation.JoinMessage{
IsInit: proto.Bool(!s.connected),
IsInit: proto.Bool(!s.IsConnected()),
Ip: proto.String(s.Ip),
Port: proto.Uint32(uint32(s.Port)),
PublicUrl: proto.String(s.PublicUrl),
@ -288,9 +292,9 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
s.masterNodes.reset()
return masterNode, "", errors.New(ret.Error)
}
s.volumeSizeLimit = ret.VolumeSizeLimit
s.SetVolumeSizeLimit(ret.VolumeSizeLimit)
secretKey = security.Secret(ret.SecretKey)
s.connected = true
s.SetConnected(true)
return
}
func (s *Store) Close() {
@ -307,10 +311,10 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
size, err = v.write(n)
} else {
err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.volumeSizeLimit, v.ContentSize())
err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
}
if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) {
glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit)
if s.GetVolumeSizeLimit() < v.ContentSize()+3*uint64(size) {
glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.GetVolumeSizeLimit())
if _, _, e := s.SendHeartbeatToMaster(); e != nil {
glog.V(0).Infoln("error when reporting size:", e)
}
@ -350,3 +354,27 @@ func (s *Store) WalkVolume(walker VolumeWalker) error {
}
return nil
}
func (s *Store) GetVolumeSizeLimit() uint64 {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.volumeSizeLimit
}
func (s *Store) SetVolumeSizeLimit(sz uint64) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.volumeSizeLimit = sz
}
func (s *Store) IsConnected() bool {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.connected
}
func (s *Store) SetConnected(b bool) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.connected = b
}
Loading…
Cancel
Save