Browse Source

Add TraverseBfsWithContext and fix race conditions in error handling

- Add TraverseBfsWithContext function to support context cancellation
- Fix race condition in doTraverseBfsAndSaving using atomic.Bool and sync.Once
- Improve error handling with fail-fast behavior and proper error propagation
- Update command_volume_fsck to use error-returning saveFn callback
- Enhance error messages in readFilerFileIdFile with detailed context
pull/8015/head
Jaehoon Kim 4 days ago
parent
commit
bc03599011
  1. 88
      weed/pb/filer_pb/filer_client_bfs.go
  2. 43
      weed/server/master_grpc_server_raft.go
  3. 60
      weed/shell/command_fs_meta_save.go
  4. 3
      weed/shell/command_fs_verify.go
  5. 25
      weed/shell/command_volume_fsck.go

88
weed/pb/filer_pb/filer_client_bfs.go

@ -13,57 +13,97 @@ import (
) )
func TraverseBfs(filerClient FilerClient, parentPath util.FullPath, fn func(parentPath util.FullPath, entry *Entry)) (err error) { func TraverseBfs(filerClient FilerClient, parentPath util.FullPath, fn func(parentPath util.FullPath, entry *Entry)) (err error) {
return TraverseBfsWithContext(context.Background(), filerClient, parentPath, func(parentPath util.FullPath, entry *Entry) error {
fn(parentPath, entry)
return nil
})
}
func TraverseBfsWithContext(ctx context.Context, filerClient FilerClient, parentPath util.FullPath, fn func(parentPath util.FullPath, entry *Entry) error) (err error) {
K := 5 K := 5
var jobQueueWg sync.WaitGroup
ctx, cancel := context.WithCancel(ctx)
defer cancel()
queue := util.NewQueue[util.FullPath]() queue := util.NewQueue[util.FullPath]()
jobQueueWg.Add(1)
var pending sync.WaitGroup
pending.Add(1)
queue.Enqueue(parentPath) queue.Enqueue(parentPath)
terminates := make([]chan bool, K)
var once sync.Once
var firstErr error
enqueue := func(p util.FullPath) bool {
// Stop expanding traversal once canceled (e.g. first error encountered).
if ctx.Err() != nil {
return false
}
pending.Add(1)
queue.Enqueue(p)
return true
}
done := make(chan struct{})
var workers sync.WaitGroup
for i := 0; i < K; i++ { for i := 0; i < K; i++ {
terminates[i] = make(chan bool)
go func(j int) {
workers.Add(1)
go func() {
defer workers.Done()
for { for {
select { select {
case <-terminates[j]:
case <-done:
return return
default: default:
t := queue.Dequeue()
if t == "" {
time.Sleep(329 * time.Millisecond)
}
dir := queue.Dequeue()
if dir == "" {
// queue is empty for now
select {
case <-done:
return
case <-time.After(50 * time.Millisecond):
continue continue
} }
dir := t
processErr := processOneDirectory(filerClient, dir, queue, &jobQueueWg, fn)
}
// Always mark the directory as done so the closer can finish.
if ctx.Err() == nil {
processErr := processOneDirectory(ctx, filerClient, dir, enqueue, fn)
if processErr != nil { if processErr != nil {
err = processErr
once.Do(func() {
firstErr = processErr
cancel()
})
} }
jobQueueWg.Done()
} }
pending.Done()
} }
}(i)
}()
} }
jobQueueWg.Wait()
for i := 0; i < K; i++ {
close(terminates[i])
}
return
pending.Wait()
close(done)
workers.Wait()
return firstErr
} }
func processOneDirectory(filerClient FilerClient, parentPath util.FullPath, queue *util.Queue[util.FullPath], jobQueueWg *sync.WaitGroup, fn func(parentPath util.FullPath, entry *Entry)) (err error) {
func processOneDirectory(ctx context.Context, filerClient FilerClient, parentPath util.FullPath, enqueue func(p util.FullPath) bool, fn func(parentPath util.FullPath, entry *Entry) error) (err error) {
return ReadDirAllEntries(context.Background(), filerClient, parentPath, "", func(entry *Entry, isLast bool) error {
return ReadDirAllEntries(ctx, filerClient, parentPath, "", func(entry *Entry, isLast bool) error {
fn(parentPath, entry)
if err := fn(parentPath, entry); err != nil {
return err
}
if entry.IsDirectory { if entry.IsDirectory {
subDir := fmt.Sprintf("%s/%s", parentPath, entry.Name) subDir := fmt.Sprintf("%s/%s", parentPath, entry.Name)
if parentPath == "/" { if parentPath == "/" {
subDir = "/" + entry.Name subDir = "/" + entry.Name
} }
jobQueueWg.Add(1)
queue.Enqueue(util.FullPath(subDir))
enqueue(util.FullPath(subDir))
} }
return nil return nil
}) })

43
weed/server/master_grpc_server_raft.go

@ -14,22 +14,47 @@ func (ms *MasterServer) RaftListClusterServers(ctx context.Context, req *master_
resp := &master_pb.RaftListClusterServersResponse{} resp := &master_pb.RaftListClusterServersResponse{}
ms.Topo.RaftServerAccessLock.RLock() ms.Topo.RaftServerAccessLock.RLock()
if ms.Topo.HashicorpRaft == nil {
if ms.Topo.HashicorpRaft == nil && ms.Topo.RaftServer == nil {
ms.Topo.RaftServerAccessLock.RUnlock() ms.Topo.RaftServerAccessLock.RUnlock()
return resp, nil return resp, nil
} }
servers := ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers
_, leaderId := ms.Topo.HashicorpRaft.LeaderWithID()
ms.Topo.RaftServerAccessLock.RUnlock()
if ms.Topo.HashicorpRaft != nil {
servers := ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers
_, leaderId := ms.Topo.HashicorpRaft.LeaderWithID()
ms.Topo.RaftServerAccessLock.RUnlock()
for _, server := range servers {
for _, server := range servers {
resp.ClusterServers = append(resp.ClusterServers, &master_pb.RaftListClusterServersResponse_ClusterServers{
Id: string(server.ID),
Address: string(server.Address),
Suffrage: server.Suffrage.String(),
IsLeader: server.ID == leaderId,
})
}
} else if ms.Topo.RaftServer != nil {
peers := ms.Topo.RaftServer.Peers()
leader := ms.Topo.RaftServer.Leader()
currentServerName := ms.Topo.RaftServer.Name()
ms.Topo.RaftServerAccessLock.RUnlock()
// Add the current server itself (Peers() only returns other peers)
resp.ClusterServers = append(resp.ClusterServers, &master_pb.RaftListClusterServersResponse_ClusterServers{ resp.ClusterServers = append(resp.ClusterServers, &master_pb.RaftListClusterServersResponse_ClusterServers{
Id: string(server.ID),
Address: string(server.Address),
Suffrage: server.Suffrage.String(),
IsLeader: server.ID == leaderId,
Id: currentServerName,
Address: string(ms.option.Master),
Suffrage: "Voter",
IsLeader: currentServerName == leader,
}) })
// Add all other peers
for _, peer := range peers {
resp.ClusterServers = append(resp.ClusterServers, &master_pb.RaftListClusterServersResponse_ClusterServers{
Id: peer.Name,
Address: peer.ConnectionString,
Suffrage: "Voter",
IsLeader: peer.Name == leader,
})
}
} }
return resp, nil return resp, nil
} }

60
weed/shell/command_fs_meta_save.go

@ -118,14 +118,21 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
outputChan <- bytes outputChan <- bytes
return nil return nil
}, func(outputChan chan interface{}) {
}, func(outputChan chan interface{}) error {
sizeBuf := make([]byte, 4) sizeBuf := make([]byte, 4)
for item := range outputChan { for item := range outputChan {
b := item.([]byte) b := item.([]byte)
util.Uint32toBytes(sizeBuf, uint32(len(b))) util.Uint32toBytes(sizeBuf, uint32(len(b)))
dst.Write(sizeBuf)
dst.Write(b)
_, err := dst.Write(sizeBuf)
if err != nil {
return err
}
_, err = dst.Write(b)
if err != nil {
return err
}
} }
return nil
}) })
if err == nil { if err == nil {
@ -136,17 +143,21 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
} }
func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, path string, verbose bool, genFn func(entry *filer_pb.FullEntry, outputChan chan interface{}) error, saveFn func(outputChan chan interface{})) error {
func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, path string, verbose bool, genFn func(entry *filer_pb.FullEntry, outputChan chan interface{}) error, saveFn func(outputChan chan interface{}) error) error {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
outputChan := make(chan interface{}, 1024) outputChan := make(chan interface{}, 1024)
saveErrChan := make(chan error, 1)
go func() { go func() {
saveFn(outputChan)
saveErrChan <- saveFn(outputChan)
wg.Done() wg.Done()
}() }()
var dirCount, fileCount uint64 var dirCount, fileCount uint64
var once sync.Once
var firstErr error
var hasErr atomic.Bool
// also save the directory itself (path) if it exists in the filer // also save the directory itself (path) if it exists in the filer
if e, getErr := filer_pb.GetEntry(context.Background(), filerClient, util.FullPath(path)); getErr != nil { if e, getErr := filer_pb.GetEntry(context.Background(), filerClient, util.FullPath(path)); getErr != nil {
@ -160,8 +171,16 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer,
Dir: parentDir, Dir: parentDir,
Entry: e, Entry: e,
} }
if hasErr.Load() {
// fail-fast: do not continue emitting partial results after first error
return firstErr
}
if genErr := genFn(protoMessage, outputChan); genErr != nil { if genErr := genFn(protoMessage, outputChan); genErr != nil {
fmt.Fprintf(writer, "marshall error: %v\n", genErr)
once.Do(func() {
firstErr = genErr
hasErr.Store(true)
})
return genErr
} else { } else {
if e.IsDirectory { if e.IsDirectory {
atomic.AddUint64(&dirCount, 1) atomic.AddUint64(&dirCount, 1)
@ -171,10 +190,12 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer,
} }
} }
err := filer_pb.TraverseBfs(filerClient, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := filer_pb.TraverseBfsWithContext(ctx, filerClient, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) error {
if strings.HasPrefix(string(parentPath), filer.SystemLogDir) { if strings.HasPrefix(string(parentPath), filer.SystemLogDir) {
return
return nil
} }
protoMessage := &filer_pb.FullEntry{ protoMessage := &filer_pb.FullEntry{
@ -182,9 +203,17 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer,
Entry: entry, Entry: entry,
} }
if err := genFn(protoMessage, outputChan); err != nil {
fmt.Fprintf(writer, "marshall error: %v\n", err)
return
if hasErr.Load() {
// fail-fast: stop traversal once an error is observed.
return firstErr
}
if genErr := genFn(protoMessage, outputChan); genErr != nil {
once.Do(func() {
firstErr = genErr
hasErr.Store(true)
cancel()
})
return genErr
} }
if entry.IsDirectory { if entry.IsDirectory {
@ -197,11 +226,20 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer,
println(parentPath.Child(entry.Name)) println(parentPath.Child(entry.Name))
} }
return nil
}) })
close(outputChan) close(outputChan)
wg.Wait() wg.Wait()
saveErr := <-saveErrChan
if firstErr != nil {
return firstErr
}
if saveErr != nil {
return saveErr
}
if err == nil && writer != nil { if err == nil && writer != nil {
fmt.Fprintf(writer, "total %d directories, %d files\n", dirCount, fileCount) fmt.Fprintf(writer, "total %d directories, %d files\n", dirCount, fileCount)

3
weed/shell/command_fs_verify.go

@ -300,7 +300,7 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errC
} }
return nil return nil
}, },
func(outputChan chan interface{}) {
func(outputChan chan interface{}) error {
var wg sync.WaitGroup var wg sync.WaitGroup
itemErrCount := atomic.NewUint64(0) itemErrCount := atomic.NewUint64(0)
for itemEntry := range outputChan { for itemEntry := range outputChan {
@ -315,5 +315,6 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errC
} }
wg.Wait() wg.Wait()
errCount = itemErrCount.Load() errCount = itemErrCount.Load()
return nil
}) })
} }

25
weed/shell/command_volume_fsck.go

@ -266,7 +266,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m
} }
return nil return nil
}, },
func(outputChan chan interface{}) {
func(outputChan chan interface{}) error {
buffer := make([]byte, readbufferSize) buffer := make([]byte, readbufferSize)
for item := range outputChan { for item := range outputChan {
i := item.(*Item) i := item.(*Item)
@ -274,8 +274,12 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m
util.Uint64toBytes(buffer, i.fileKey) util.Uint64toBytes(buffer, i.fileKey)
util.Uint32toBytes(buffer[8:], i.cookie) util.Uint32toBytes(buffer[8:], i.cookie)
util.Uint32toBytes(buffer[12:], uint32(len(i.path))) util.Uint32toBytes(buffer[12:], uint32(len(i.path)))
f.Write(buffer)
f.Write([]byte(i.path))
if _, err := f.Write(buffer); err != nil {
return err
}
if _, err := f.Write([]byte(i.path)); err != nil {
return err
}
} else if *c.findMissingChunksInFiler && len(c.volumeIds) == 0 { } else if *c.findMissingChunksInFiler && len(c.volumeIds) == 0 {
fmt.Fprintf(c.writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path) fmt.Fprintf(c.writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path)
if purgeAbsent { if purgeAbsent {
@ -284,6 +288,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m
} }
} }
} }
return nil
}) })
} }
@ -482,10 +487,10 @@ func (c *commandVolumeFsck) readFilerFileIdFile(volumeId uint32, fn func(needleI
break break
} }
if readErr != nil { if readErr != nil {
return readErr
return fmt.Errorf("read fid header for volume %d: %w", volumeId, readErr)
} }
if readSize != readbufferSize { if readSize != readbufferSize {
return fmt.Errorf("readSize mismatch")
return fmt.Errorf("read fid header size mismatch for volume %d: got %d want %d", volumeId, readSize, readbufferSize)
} }
item.fileKey = util.BytesToUint64(buffer[:8]) item.fileKey = util.BytesToUint64(buffer[:8])
item.cookie = util.BytesToUint32(buffer[8:12]) item.cookie = util.BytesToUint32(buffer[8:12])
@ -493,10 +498,10 @@ func (c *commandVolumeFsck) readFilerFileIdFile(volumeId uint32, fn func(needleI
pathBytes := make([]byte, int(pathSize)) pathBytes := make([]byte, int(pathSize))
n, err := io.ReadFull(br, pathBytes) n, err := io.ReadFull(br, pathBytes)
if err != nil { if err != nil {
fmt.Fprintf(c.writer, "%d,%x%08x in unexpected error: %v\n", volumeId, item.fileKey, item.cookie, err)
return fmt.Errorf("read fid path for volume %d,%x%08x: %w", volumeId, item.fileKey, item.cookie, err)
} }
if n != int(pathSize) { if n != int(pathSize) {
fmt.Fprintf(c.writer, "%d,%x%08x %d unexpected file name size %d\n", volumeId, item.fileKey, item.cookie, pathSize, n)
return fmt.Errorf("read fid path size mismatch for volume %d,%x%08x: got %d want %d", volumeId, item.fileKey, item.cookie, n, pathSize)
} }
item.path = util.FullPath(pathBytes) item.path = util.FullPath(pathBytes)
needleId := types.NeedleId(item.fileKey) needleId := types.NeedleId(item.fileKey)
@ -748,10 +753,10 @@ func writeToFile(bytes []byte, fileName string) error {
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
dst, err := os.OpenFile(fileName, flags, 0644) dst, err := os.OpenFile(fileName, flags, 0644)
if err != nil { if err != nil {
return nil
return err
} }
defer dst.Close() defer dst.Close()
dst.Write(bytes)
return nil
_, err = dst.Write(bytes)
return err
} }
Loading…
Cancel
Save