diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go index 3a3935244..f931bf397 100644 --- a/weed/storage/needle_map_leveldb.go +++ b/weed/storage/needle_map_leveldb.go @@ -132,15 +132,17 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error { } func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) { - bytes := make([]byte, NeedleIdSize) if m.ldbTimeout > 0 { - m.ldbAccessLock.RLock() - defer m.ldbAccessLock.RUnlock() - loadErr := reloadLdb(m) - if loadErr != nil { + if err := m.ensureLdbLoaded(); err != nil { return nil, false } + defer m.ldbAccessLock.RUnlock() } + return m.getFromDb(key) +} + +func (m *LevelDbNeedleMap) getFromDb(key NeedleId) (element *needle_map.NeedleValue, ok bool) { + bytes := make([]byte, NeedleIdSize) NeedleIdToBytes(bytes[0:NeedleIdSize], key) data, err := m.db.Get(bytes, nil) if err != nil || len(data) != OffsetSize+SizeSize { @@ -155,14 +157,12 @@ func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size Size) error { var oldSize Size var watermark uint64 if m.ldbTimeout > 0 { - m.ldbAccessLock.RLock() - defer m.ldbAccessLock.RUnlock() - loadErr := reloadLdb(m) - if loadErr != nil { - return loadErr + if err := m.ensureLdbLoaded(); err != nil { + return err } + defer m.ldbAccessLock.RUnlock() } - if oldNeedle, ok := m.Get(key); ok { + if oldNeedle, ok := m.getFromDb(key); ok { oldSize = oldNeedle.Size } m.logPut(key, oldSize, size) @@ -222,14 +222,12 @@ func levelDbDelete(db *leveldb.DB, key NeedleId) error { func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error { var watermark uint64 if m.ldbTimeout > 0 { - m.ldbAccessLock.RLock() - defer m.ldbAccessLock.RUnlock() - loadErr := reloadLdb(m) - if loadErr != nil { - return loadErr + if err := m.ensureLdbLoaded(); err != nil { + return err } + defer m.ldbAccessLock.RUnlock() } - oldNeedle, found := m.Get(key) + oldNeedle, found := m.getFromDb(key) if !found || oldNeedle.Size.IsDeleted() { return nil } @@ -400,6 +398,24 @@ func (m *LevelDbNeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startF return err } +func (m *LevelDbNeedleMap) ensureLdbLoaded() error { + for { + m.ldbAccessLock.RLock() + if m.db != nil { + return nil + } + m.ldbAccessLock.RUnlock() + m.ldbAccessLock.Lock() + if m.db == nil { + if err := reloadLdb(m); err != nil { + m.ldbAccessLock.Unlock() + return err + } + } + m.ldbAccessLock.Unlock() + } +} + func reloadLdb(m *LevelDbNeedleMap) (err error) { if m.db != nil { return nil diff --git a/weed/storage/needle_map_leveldb_test.go b/weed/storage/needle_map_leveldb_test.go new file mode 100644 index 000000000..50f1b04ea --- /dev/null +++ b/weed/storage/needle_map_leveldb_test.go @@ -0,0 +1,90 @@ +package storage + +import ( + "fmt" + "os" + "path/filepath" + "sync" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/storage/types" +) + +func TestLevelDbNeedleMap_Concurrency(t *testing.T) { + dir, err := os.MkdirTemp("", "test_leveldb_concurrency") + if err != nil { + t.Fatalf("temp dir: %v", err) + } + defer os.RemoveAll(dir) + + prefix := "test" + indexFile, err := os.Create(filepath.Join(dir, prefix+".idx")) + if err != nil { + t.Fatalf("create index file: %v", err) + } + dbFileName := filepath.Join(dir, prefix+".ldb") + + // Create and initialize map + m, err := NewLevelDbNeedleMap(dbFileName, indexFile, nil, 1) + if err != nil { + t.Fatalf("NewLevelDbNeedleMap: %v", err) + } + defer m.Close() + + // Pre-populate some data + key := types.NeedleId(1) + if err := m.Put(key, types.ToOffset(100), types.Size(200)); err != nil { + t.Fatalf("Put: %v", err) + } + + // Force unload to start from nil state + if err := unloadLdb(m); err != nil { + t.Fatalf("unloadLdb: %v", err) + } + + var wg sync.WaitGroup + startCh := make(chan struct{}) + errCh := make(chan error, 100) + + // Spawn multiple goroutines to trigger the race + // Multiple readers will see m.db == nil and try to reload concurrently + for i := 0; i < 2; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + <-startCh + + // Try multiple times to increase chance of collision + for j := 0; j < 2; j++ { + _, ok := m.Get(key) + if !ok { + // Get failed, possibly due to race in reload. + // But we also put data concurrently, so maybe it's missing if deleted? + // In this test, we only Put, never Delete. So Key 1 should be there. + // However, if DB reload fails, Get returns false! + errCh <- fmt.Errorf("routine %d iter %d: Get returned false", id, j) + } + + // Also try Put concurrently + err := m.Put(types.NeedleId(2+id), types.ToOffset(100), types.Size(200)) + if err != nil { + errCh <- fmt.Errorf("routine %d iter %d: Put failed: %v", id, j, err) + } + + // Manually unload occasionally to reset the state + if j%2 == 0 { + // This might fail if locked, but that's fine + unloadLdb(m) + } + } + }(i) + } + + close(startCh) + wg.Wait() + close(errCh) + + for e := range errCh { + t.Errorf("Error encountered: %v", e) + } +}