From 6e20f65528f227f5151eef474362000ee1e62052 Mon Sep 17 00:00:00 2001 From: tnextday Date: Sat, 12 Mar 2016 19:51:56 +0800 Subject: [PATCH] fix some data race problem detect by go race --- go/stats/duration_counter.go | 48 ++++++++++++++++++++------------ go/storage/store.go | 8 +++--- go/storage/volume.go | 46 ++++++++++++++++++++---------- go/storage/volume_super_block.go | 4 +-- go/storage/volume_vacuum.go | 4 +-- 5 files changed, 69 insertions(+), 41 deletions(-) diff --git a/go/stats/duration_counter.go b/go/stats/duration_counter.go index 69c8be61d..f920a8c4d 100644 --- a/go/stats/duration_counter.go +++ b/go/stats/duration_counter.go @@ -1,6 +1,7 @@ package stats import ( + "sync" "time" ) @@ -14,28 +15,33 @@ func NewTimedValue(t time.Time, val int64) *TimedValue { } type RoundRobinCounter struct { - LastIndex int - Values []int64 - Counts []int64 + lastIndex int + values []int64 + counts []int64 + mutex sync.RWMutex } func NewRoundRobinCounter(slots int) *RoundRobinCounter { - return &RoundRobinCounter{LastIndex: -1, Values: make([]int64, slots), Counts: make([]int64, slots)} + return &RoundRobinCounter{lastIndex: -1, values: make([]int64, slots), counts: make([]int64, slots)} } func (rrc *RoundRobinCounter) Add(index int, val int64) { - if index >= len(rrc.Values) { + rrc.mutex.Lock() + defer rrc.mutex.Unlock() + if index >= len(rrc.values) { return } - for rrc.LastIndex != index { - rrc.LastIndex = (rrc.LastIndex + 1) % len(rrc.Values) - rrc.Values[rrc.LastIndex] = 0 - rrc.Counts[rrc.LastIndex] = 0 + for rrc.lastIndex != index { + rrc.lastIndex = (rrc.lastIndex + 1) % len(rrc.values) + rrc.values[rrc.lastIndex] = 0 + rrc.counts[rrc.lastIndex] = 0 } - rrc.Values[index] += val - rrc.Counts[index]++ + rrc.values[index] += val + rrc.counts[index]++ } func (rrc *RoundRobinCounter) Max() (max int64) { - for _, val := range rrc.Values { + rrc.mutex.RLock() + defer rrc.mutex.RUnlock() + for _, val := range rrc.values { if max < val { max = val } @@ -43,28 +49,34 @@ func (rrc *RoundRobinCounter) Max() (max int64) { return } func (rrc *RoundRobinCounter) Count() (cnt int64) { - for _, c := range rrc.Counts { + rrc.mutex.RLock() + defer rrc.mutex.RUnlock() + for _, c := range rrc.counts { cnt += c } return } func (rrc *RoundRobinCounter) Sum() (sum int64) { - for _, val := range rrc.Values { + rrc.mutex.RLock() + defer rrc.mutex.RUnlock() + for _, val := range rrc.values { sum += val } return } func (rrc *RoundRobinCounter) ToList() (ret []int64) { - index := rrc.LastIndex - step := len(rrc.Values) + rrc.mutex.RLock() + defer rrc.mutex.RUnlock() + index := rrc.lastIndex + step := len(rrc.values) for step > 0 { step-- index++ - if index >= len(rrc.Values) { + if index >= len(rrc.values) { index = 0 } - ret = append(ret, rrc.Values[index]) + ret = append(ret, rrc.values[index]) } return } diff --git a/go/storage/store.go b/go/storage/store.go index 83f20700d..6d2505ced 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -190,7 +190,7 @@ func (s *Store) Status() []*VolumeInfo { FileCount: v.nm.FileCount(), DeleteCount: v.nm.DeletedCount(), DeletedByteCount: v.nm.DeletedSize(), - ReadOnly: v.readOnly, + ReadOnly: v.IsReadOnly(), Ttl: v.Ttl} stats = append(stats, s) return nil @@ -233,7 +233,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S FileCount: proto.Uint64(uint64(v.nm.FileCount())), DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())), DeletedByteCount: proto.Uint64(v.nm.DeletedSize()), - ReadOnly: proto.Bool(v.readOnly), + ReadOnly: proto.Bool(v.IsReadOnly()), Version: proto.Uint32(uint32(v.Version())), Ttl: proto.Uint32(v.Ttl.ToUint32()), } @@ -300,7 +300,7 @@ func (s *Store) Close() { } func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { if v := s.findVolume(i); v != nil { - if v.readOnly { + if v.IsReadOnly() { err = fmt.Errorf("Volume %d is read only", i) return } @@ -322,7 +322,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { return } func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) { - if v := s.findVolume(i); v != nil && !v.readOnly { + if v := s.findVolume(i); v != nil && !v.IsReadOnly() { return v.delete(n) } return 0, nil diff --git a/go/storage/volume.go b/go/storage/volume.go index a51543cc8..31531d4d7 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -24,8 +24,8 @@ type Volume struct { SuperBlock - dataFileAccessLock sync.RWMutex - lastModifiedTime uint64 //unix time in seconds + mutex sync.RWMutex + lastModifiedTime uint64 //unix time in seconds } func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, ttl *TTL) (v *Volume, e error) { @@ -36,6 +36,8 @@ func NewVolume(dirname string, collection string, id VolumeId, needleMapKind Nee return } func (v *Volume) String() string { + v.mutex.RLock() + defer v.mutex.RUnlock() return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly) } @@ -131,20 +133,22 @@ func (v *Volume) Version() Version { return v.SuperBlock.Version() } func (v *Volume) Size() int64 { + v.mutex.RLock() + defer v.mutex.RUnlock() stat, e := v.dataFile.Stat() if e == nil { return stat.Size() } - glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e) + glog.V(2).Infof("Failed to read file size %s %v", v.dataFile.Name(), e) return -1 } // Close cleanly shuts down this volume func (v *Volume) Close() { - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() + v.mutex.Lock() + defer v.mutex.Unlock() v.nm.Close() - _ = v.dataFile.Close() + v.dataFile.Close() } // isFileUnchanged checks whether this needle to write is same as last one. @@ -171,7 +175,7 @@ func (v *Volume) isFileUnchanged(n *Needle) bool { // Destroy removes everything related to this volume func (v *Volume) Destroy() (err error) { - if v.readOnly { + if v.IsReadOnly() { err = fmt.Errorf("%s is read-only", v.dataFile.Name()) return } @@ -186,12 +190,12 @@ func (v *Volume) Destroy() (err error) { // AppendBlob append a blob to end of the data file, used in replication func (v *Volume) AppendBlob(b []byte) (offset int64, err error) { + v.mutex.Lock() + defer v.mutex.Unlock() if v.readOnly { err = fmt.Errorf("%s is read-only", v.dataFile.Name()) return } - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() if offset, err = v.dataFile.Seek(0, 2); err != nil { glog.V(0).Infof("failed to seek the end of file: %v", err) return @@ -209,13 +213,13 @@ func (v *Volume) AppendBlob(b []byte) (offset int64, err error) { } func (v *Volume) write(n *Needle) (size uint32, err error) { + v.mutex.Lock() + defer v.mutex.Unlock() glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String()) if v.readOnly { err = fmt.Errorf("%s is read-only", v.dataFile.Name()) return } - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() if v.isFileUnchanged(n) { size = n.DataSize glog.V(4).Infof("needle is unchanged!") @@ -255,12 +259,12 @@ func (v *Volume) write(n *Needle) (size uint32, err error) { } func (v *Volume) delete(n *Needle) (uint32, error) { + v.mutex.Lock() + defer v.mutex.Unlock() glog.V(4).Infof("delete needle %s", NewFileIdFromNeedle(v.Id, n).String()) if v.readOnly { return 0, fmt.Errorf("%s is read-only", v.dataFile.Name()) } - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() nv, ok := v.nm.Get(n.Id) //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size) if ok { @@ -284,9 +288,9 @@ func (v *Volume) readNeedle(n *Needle) (int, error) { if !ok || nv.Offset == 0 { return -1, errors.New("Not Found") } - v.dataFileAccessLock.RLock() + v.mutex.RLock() err := n.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) - v.dataFileAccessLock.RUnlock() + v.mutex.RUnlock() if err != nil { return 0, err } @@ -392,6 +396,8 @@ func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Ti // or when the volume does not have a ttl // or when volumeSizeLimit is 0 when server just starts func (v *Volume) expired(volumeSizeLimit uint64) bool { + v.mutex.RLock() + defer v.mutex.RUnlock() if volumeSizeLimit == 0 { //skip if we don't know size limit return false @@ -413,6 +419,8 @@ func (v *Volume) expired(volumeSizeLimit uint64) bool { // wait either maxDelayMinutes or 10% of ttl minutes func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool { + v.mutex.RLock() + defer v.mutex.RUnlock() if v.Ttl == nil || v.Ttl.Minutes() == 0 { return false } @@ -428,6 +436,8 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool { } func (v *Volume) SetReadOnly(isReadOnly bool) error { + v.mutex.Lock() + defer v.mutex.Unlock() if isReadOnly == false { if fi, e := v.dataFile.Stat(); e != nil { return e @@ -440,3 +450,9 @@ func (v *Volume) SetReadOnly(isReadOnly bool) error { v.readOnly = isReadOnly return nil } + +func (v *Volume) IsReadOnly() bool { + v.mutex.RLock() + defer v.mutex.RUnlock() + return v.readOnly +} diff --git a/go/storage/volume_super_block.go b/go/storage/volume_super_block.go index 7c1ea0eb2..d6e9e8395 100644 --- a/go/storage/volume_super_block.go +++ b/go/storage/volume_super_block.go @@ -74,8 +74,8 @@ func (v *Volume) readSuperBlock() (err error) { } func (v *Volume) writeSuperBlock() (err error) { - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() + v.mutex.Lock() + defer v.mutex.Unlock() if _, e := v.dataFile.WriteAt(v.SuperBlock.Bytes(), 0); e != nil { return fmt.Errorf("cannot write volume %d super block: %v", v.Id, e) } diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go index 3941a568f..646266a4c 100644 --- a/go/storage/volume_vacuum.go +++ b/go/storage/volume_vacuum.go @@ -25,8 +25,8 @@ func (v *Volume) Compact() error { } func (v *Volume) commitCompact() error { glog.V(3).Infof("Committing vacuuming...") - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() + v.mutex.Lock() + defer v.mutex.Unlock() glog.V(3).Infof("Got Committing lock...") _ = v.dataFile.Close() var e error