From 82d9d8687ba6f857bcc94002d664dfa86ce041d3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 5 Feb 2026 10:24:18 -0800 Subject: [PATCH] Fix concurrent map access in EC shards info (#8222) * fix concurrent map access in EC shards info #8219 * refactor: simplify Disk.ToDiskInfo to use ecShards snapshot and avoid redundant locking * refactor: improve GetEcShards with pre-allocation and defer --- .../erasure_coding/ec_shards_info_test.go | 44 +++++++++++++++++++ weed/topology/disk_ec.go | 5 ++- weed/topology/race_condition_stress_test.go | 44 +++++++++++++++++++ 3 files changed, 91 insertions(+), 2 deletions(-) diff --git a/weed/storage/erasure_coding/ec_shards_info_test.go b/weed/storage/erasure_coding/ec_shards_info_test.go index 523217417..f96fbbca6 100644 --- a/weed/storage/erasure_coding/ec_shards_info_test.go +++ b/weed/storage/erasure_coding/ec_shards_info_test.go @@ -1,6 +1,7 @@ package erasure_coding import ( + "sync" "testing" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" @@ -364,3 +365,46 @@ func BenchmarkShardsInfo_Size(b *testing.B) { si.Size(ShardId(i % TotalShardsCount)) } } + +func TestShardsInfo_ConcurrentAccess(t *testing.T) { + si := NewShardsInfo() + + var wg sync.WaitGroup + wg.Add(3) + + // Goroutine 1: Continuously Set/Delete shards + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + si.Set(ShardInfo{Id: ShardId(i % TotalShardsCount), Size: 100}) + if i%10 == 0 { + si.Delete(ShardId((i / 10) % TotalShardsCount)) + } + } + }() + + // Goroutine 2: Continuously read Info (Sizes, Bitmap, Count) + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + si.Sizes() + si.Bitmap() + si.Count() + si.TotalSize() + } + }() + + // Goroutine 3: Continuously Add/Subtract from another ShardsInfo + go func() { + defer wg.Done() + other := NewShardsInfo() + other.Set(ShardInfo{Id: 1, Size: 100}) + other.Set(ShardInfo{Id: 2, Size: 200}) + for i := 0; i < 1000; i++ { + si.Add(other) + si.Subtract(other) + } + }() + + wg.Wait() +} diff --git a/weed/topology/disk_ec.go b/weed/topology/disk_ec.go index a783b309e..6ad3ac958 100644 --- a/weed/topology/disk_ec.go +++ b/weed/topology/disk_ec.go @@ -7,11 +7,12 @@ import ( ) func (d *Disk) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) { - d.RLock() + d.ecShardsLock.RLock() + defer d.ecShardsLock.RUnlock() + ret = make([]*erasure_coding.EcVolumeInfo, 0, len(d.ecShards)) for _, ecVolumeInfo := range d.ecShards { ret = append(ret, ecVolumeInfo) } - d.RUnlock() return ret } diff --git a/weed/topology/race_condition_stress_test.go b/weed/topology/race_condition_stress_test.go index 79c460590..6d8d9260c 100644 --- a/weed/topology/race_condition_stress_test.go +++ b/weed/topology/race_condition_stress_test.go @@ -8,6 +8,8 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/sequence" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/types" ) @@ -304,3 +306,45 @@ func TestReservationSystemPerformance(t *testing.T) { t.Logf("Performance test passed: %v per reservation", avgDuration) } } + +func TestDisk_GetEcShards_Race(t *testing.T) { + d := NewDisk("hdd") + + // Pre-populate with one shard + initialShard := &erasure_coding.EcVolumeInfo{ + VolumeId: needle.VolumeId(1), + ShardsInfo: erasure_coding.NewShardsInfo(), + } + initialShard.ShardsInfo.Set(erasure_coding.ShardInfo{Id: 0, Size: 100}) + d.AddOrUpdateEcShard(initialShard) + + var wg sync.WaitGroup + wg.Add(10) + + // Goroutine 1-5: Continuously read shards + for j := 0; j < 5; j++ { + go func() { + defer wg.Done() + for i := 0; i < 10000; i++ { + d.GetEcShards() + } + }() + } + + // Goroutine 6-10: Continuously update shards + for j := 0; j < 5; j++ { + go func() { + defer wg.Done() + for i := 0; i < 10000; i++ { + shard := &erasure_coding.EcVolumeInfo{ + VolumeId: needle.VolumeId(i % 100), + ShardsInfo: erasure_coding.NewShardsInfo(), + } + shard.ShardsInfo.Set(erasure_coding.ShardInfo{Id: erasure_coding.ShardId(i % 14), Size: 100}) + d.AddOrUpdateEcShard(shard) + } + }() + } + + wg.Wait() +}