Browse Source

Fix concurrent map access in EC shards info (#8222)

* fix concurrent map access in EC shards info #8219

* refactor: simplify Disk.ToDiskInfo to use ecShards snapshot and avoid redundant locking

* refactor: improve GetEcShards with pre-allocation and defer
pull/8225/head
Chris Lu 2 days ago
committed by GitHub
parent
commit
82d9d8687b
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 44
      weed/storage/erasure_coding/ec_shards_info_test.go
  2. 5
      weed/topology/disk_ec.go
  3. 44
      weed/topology/race_condition_stress_test.go

44
weed/storage/erasure_coding/ec_shards_info_test.go

@ -1,6 +1,7 @@
package erasure_coding
import (
"sync"
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
@ -364,3 +365,46 @@ func BenchmarkShardsInfo_Size(b *testing.B) {
si.Size(ShardId(i % TotalShardsCount))
}
}
func TestShardsInfo_ConcurrentAccess(t *testing.T) {
si := NewShardsInfo()
var wg sync.WaitGroup
wg.Add(3)
// Goroutine 1: Continuously Set/Delete shards
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
si.Set(ShardInfo{Id: ShardId(i % TotalShardsCount), Size: 100})
if i%10 == 0 {
si.Delete(ShardId((i / 10) % TotalShardsCount))
}
}
}()
// Goroutine 2: Continuously read Info (Sizes, Bitmap, Count)
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
si.Sizes()
si.Bitmap()
si.Count()
si.TotalSize()
}
}()
// Goroutine 3: Continuously Add/Subtract from another ShardsInfo
go func() {
defer wg.Done()
other := NewShardsInfo()
other.Set(ShardInfo{Id: 1, Size: 100})
other.Set(ShardInfo{Id: 2, Size: 200})
for i := 0; i < 1000; i++ {
si.Add(other)
si.Subtract(other)
}
}()
wg.Wait()
}

5
weed/topology/disk_ec.go

@ -7,11 +7,12 @@ import (
)
func (d *Disk) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) {
d.RLock()
d.ecShardsLock.RLock()
defer d.ecShardsLock.RUnlock()
ret = make([]*erasure_coding.EcVolumeInfo, 0, len(d.ecShards))
for _, ecVolumeInfo := range d.ecShards {
ret = append(ret, ecVolumeInfo)
}
d.RUnlock()
return ret
}

44
weed/topology/race_condition_stress_test.go

@ -8,6 +8,8 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/sequence"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
@ -304,3 +306,45 @@ func TestReservationSystemPerformance(t *testing.T) {
t.Logf("Performance test passed: %v per reservation", avgDuration)
}
}
func TestDisk_GetEcShards_Race(t *testing.T) {
d := NewDisk("hdd")
// Pre-populate with one shard
initialShard := &erasure_coding.EcVolumeInfo{
VolumeId: needle.VolumeId(1),
ShardsInfo: erasure_coding.NewShardsInfo(),
}
initialShard.ShardsInfo.Set(erasure_coding.ShardInfo{Id: 0, Size: 100})
d.AddOrUpdateEcShard(initialShard)
var wg sync.WaitGroup
wg.Add(10)
// Goroutine 1-5: Continuously read shards
for j := 0; j < 5; j++ {
go func() {
defer wg.Done()
for i := 0; i < 10000; i++ {
d.GetEcShards()
}
}()
}
// Goroutine 6-10: Continuously update shards
for j := 0; j < 5; j++ {
go func() {
defer wg.Done()
for i := 0; i < 10000; i++ {
shard := &erasure_coding.EcVolumeInfo{
VolumeId: needle.VolumeId(i % 100),
ShardsInfo: erasure_coding.NewShardsInfo(),
}
shard.ShardsInfo.Set(erasure_coding.ShardInfo{Id: erasure_coding.ShardId(i % 14), Size: 100})
d.AddOrUpdateEcShard(shard)
}
}()
}
wg.Wait()
}
Loading…
Cancel
Save