Browse Source

avoid data race of TraverseBfs (#3856)

* avoid data race of TraverseBfs

* close is enough
avoid panic
I1014 12:29:59.207120 volume_loading.go:131 loading sorted db /tmp/sw/test2_19.sdx error: unexpected file /tmp/sw/test2_19.idx size: 255
I1014 12:29:59.207125 volume_loading.go:119 open to write file /tmp/sw/test4_26.idx
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x5260a4c]

goroutine 287 [running]:
github.com/seaweedfs/seaweedfs/weed/storage.(*SortedFileNeedleMap).Close(0x0)
        /Users/tochka/GolandProjects/seaweedfs/weed/storage/needle_map_sorted_file.go:97 +0x2c
github.com/seaweedfs/seaweedfs/weed/storage.(*Volume).load.func1()
        /Users/tochka/GolandProjects/seaweedfs/weed/storage/volume_loading.go:32 +0x8e
github.com/seaweedfs/seaweedfs/weed/storage.(*Volume).load(0xc001b36280, 0x1, 0x1, 0x0, 0x69228c0?)
        /Users/tochka/GolandProjects/seaweedfs/weed/storage/volume_loading.go:205 +0x256c
github.com/seaweedfs/seaweedfs/weed/storage.NewVolume({0x7ffeefbff6e0, 0x7}, {0x7ffeefbff6e0, 0x7}, {0xc0009a9284, 0x5}, 0x13, 0x0, 0x0, 0x0, ...)
        /Users/tochka/GolandProjects/seaweedfs/weed/storage/volume.go:62 +0x30f
github.com/seaweedfs/seaweedfs/weed/storage.(*DiskLocation).loadExistingVolume(0xc0006f40c0, {0x846c8d0, 0xc0009ce600}, 0x0?, 0x1)
        /Users/tochka/GolandProjects/seaweedfs/weed/storage/disk_location.go:161 +0x4da
github.com/seaweedfs/seaweedfs/weed/storage.(*DiskLocation).concurrentLoadingVolumes.func2()
        /Users/tochka/GolandProjects/seaweedfs/weed/storage/disk_location.go:201 +0xf9
created by github.com/seaweedfs/seaweedfs/weed/storage.(*DiskLocation).concurrentLoadingVolumes
        /Users/tochka/GolandProjects/seaweedfs/weed/storage/disk_location.go:198 +0x150
pull/3865/head
Konstantin Lebedev 2 years ago
committed by GitHub
parent
commit
e20f0dbd2d
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      weed/pb/filer_pb/filer_client_bfs.go
  2. 3
      weed/storage/needle_map_sorted_file.go

20
weed/pb/filer_pb/filer_client_bfs.go

@ -9,21 +9,22 @@ import (
) )
func TraverseBfs(filerClient FilerClient, parentPath util.FullPath, fn func(parentPath util.FullPath, entry *Entry)) (err error) { func TraverseBfs(filerClient FilerClient, parentPath util.FullPath, fn func(parentPath util.FullPath, entry *Entry)) (err error) {
K := 5 K := 5
var jobQueueWg sync.WaitGroup var jobQueueWg sync.WaitGroup
queue := util.NewQueue() queue := util.NewQueue()
jobQueueWg.Add(1) jobQueueWg.Add(1)
queue.Enqueue(parentPath) queue.Enqueue(parentPath)
var isTerminating bool
terminates := make([]chan bool, K)
for i := 0; i < K; i++ { for i := 0; i < K; i++ {
go func() {
terminates[i] = make(chan bool)
go func(j int) {
for { for {
if isTerminating {
break
}
select {
case <-terminates[j]:
return
default:
t := queue.Dequeue() t := queue.Dequeue()
if t == nil { if t == nil {
time.Sleep(329 * time.Millisecond) time.Sleep(329 * time.Millisecond)
@ -36,10 +37,13 @@ func TraverseBfs(filerClient FilerClient, parentPath util.FullPath, fn func(pare
} }
jobQueueWg.Done() jobQueueWg.Done()
} }
}()
}
}(i)
} }
jobQueueWg.Wait() jobQueueWg.Wait()
isTerminating = true
for i := 0; i < K; i++ {
close(terminates[i])
}
return return
} }

3
weed/storage/needle_map_sorted_file.go

@ -94,6 +94,9 @@ func (m *SortedFileNeedleMap) Delete(key NeedleId, offset Offset) error {
} }
func (m *SortedFileNeedleMap) Close() { func (m *SortedFileNeedleMap) Close() {
if m == nil {
return
}
if m.indexFile != nil { if m.indexFile != nil {
m.indexFile.Close() m.indexFile.Close()
} }

Loading…
Cancel
Save