From 1b13324fb79a8dcbc00044b90c357c41c2cc70c7 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 9 Dec 2025 19:03:15 -0800 Subject: [PATCH] fix: skip log files with deleted volumes in filer backup (#7692) fix: skip log files with deleted volumes in filer backup (#3720) When filer.backup or filer.meta.backup resumes after being stopped, it may encounter persisted log files stored on volumes that have since been deleted (via volume.deleteEmpty -force). Previously, this caused the backup to get stuck in an infinite retry loop with 'volume X not found' errors. This fix catches 'volume not found' errors when reading log files and skips the problematic file instead of failing. The backup will now: - Log a warning about the missing volume - Skip the problematic log file - Continue with the next log file, allowing progress The VolumeNotFoundPattern regex was already defined but never used - this change puts it to use. Fixes #3720 --- weed/filer/filer_notify.go | 14 ++++++- weed/filer/filer_notify_read.go | 69 +++++++++++++++++++++++++++------ weed/filer/stream.go | 10 ++++- 3 files changed, 78 insertions(+), 15 deletions(-) diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go index 45c9b070f..6fd595f87 100644 --- a/weed/filer/filer_notify.go +++ b/weed/filer/filer_notify.go @@ -152,9 +152,21 @@ func (f *Filer) logFlushFunc(logBuffer *log_buffer.LogBuffer, startTime, stopTim } var ( - VolumeNotFoundPattern = regexp.MustCompile(`volume \d+? not found`) + volumeNotFoundPattern = regexp.MustCompile(`volume \d+? not found`) + chunkNotFoundPattern = regexp.MustCompile(`(urls not found|File Not Found)`) ) +// isChunkNotFoundError checks if the error indicates that a volume or chunk +// has been deleted and is no longer available. These errors can be skipped +// when reading persisted log files since the data is unrecoverable. +func isChunkNotFoundError(err error) bool { + if err == nil { + return false + } + errMsg := err.Error() + return volumeNotFoundPattern.MatchString(errMsg) || chunkNotFoundPattern.MatchString(errMsg) +} + func (f *Filer) ReadPersistedLogBuffer(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastTsNs int64, isDone bool, err error) { visitor, visitErr := f.collectPersistedLogBuffer(startPosition, stopTsNs) diff --git a/weed/filer/filer_notify_read.go b/weed/filer/filer_notify_read.go index 62cede687..a46f3fb71 100644 --- a/weed/filer/filer_notify_read.go +++ b/weed/filer/filer_notify_read.go @@ -265,11 +265,12 @@ func (c *LogFileEntryCollector) collectMore(v *OrderedLogVisitor) (err error) { // ---------- type LogFileQueueIterator struct { - q *util.Queue[*LogFileEntry] - masterClient *wdclient.MasterClient - startTsNs int64 - stopTsNs int64 - currentFileIterator *LogFileIterator + q *util.Queue[*LogFileEntry] + masterClient *wdclient.MasterClient + startTsNs int64 + stopTsNs int64 + pendingEntries []*filer_pb.LogEntry + pendingIndex int } func newLogFileQueueIterator(masterClient *wdclient.MasterClient, q *util.Queue[*LogFileEntry], startTsNs, stopTsNs int64) *LogFileQueueIterator { @@ -284,13 +285,17 @@ func newLogFileQueueIterator(masterClient *wdclient.MasterClient, q *util.Queue[ // getNext will return io.EOF when done func (iter *LogFileQueueIterator) getNext(v *OrderedLogVisitor) (logEntry *filer_pb.LogEntry, err error) { for { - if iter.currentFileIterator != nil { - logEntry, err = iter.currentFileIterator.getNext() - if err != io.EOF { - return - } + // return pending entries first + if iter.pendingIndex < len(iter.pendingEntries) { + logEntry = iter.pendingEntries[iter.pendingIndex] + iter.pendingIndex++ + return logEntry, nil } - // now either iter.currentFileIterator is nil or err is io.EOF + // reset for next file + iter.pendingEntries = nil + iter.pendingIndex = 0 + + // read entries from next file if iter.q.Len() == 0 { return nil, io.EOF } @@ -313,7 +318,38 @@ func (iter *LogFileQueueIterator) getNext(v *OrderedLogVisitor) (logEntry *filer if next != nil && next.TsNs <= iter.startTsNs { continue } - iter.currentFileIterator = newLogFileIterator(iter.masterClient, t.FileEntry, iter.startTsNs, iter.stopTsNs) + + // read all entries from this file + iter.pendingEntries, err = iter.readFileEntries(t.FileEntry) + if err != nil { + return nil, err + } + } +} + +// readFileEntries reads all log entries from a single file +func (iter *LogFileQueueIterator) readFileEntries(fileEntry *Entry) (entries []*filer_pb.LogEntry, err error) { + fileIterator := newLogFileIterator(iter.masterClient, fileEntry, iter.startTsNs, iter.stopTsNs) + defer func() { + if closeErr := fileIterator.Close(); closeErr != nil && err == nil { + err = closeErr + } + }() + + for { + logEntry, err := fileIterator.getNext() + if err == io.EOF { + return entries, nil + } + if err != nil { + if isChunkNotFoundError(err) { + // Volume or chunk was deleted, skip the rest of this log file + glog.Warningf("skipping rest of %s: %v", fileIterator.filePath, err) + return entries, nil + } + return nil, err + } + entries = append(entries, logEntry) } } @@ -324,6 +360,7 @@ type LogFileIterator struct { sizeBuf []byte startTsNs int64 stopTsNs int64 + filePath string } func newLogFileIterator(masterClient *wdclient.MasterClient, fileEntry *Entry, startTsNs, stopTsNs int64) *LogFileIterator { @@ -332,9 +369,17 @@ func newLogFileIterator(masterClient *wdclient.MasterClient, fileEntry *Entry, s sizeBuf: make([]byte, 4), startTsNs: startTsNs, stopTsNs: stopTsNs, + filePath: string(fileEntry.FullPath), } } +func (iter *LogFileIterator) Close() error { + if r, ok := iter.r.(io.Closer); ok { + return r.Close() + } + return nil +} + // getNext will return io.EOF when done func (iter *LogFileIterator) getNext() (logEntry *filer_pb.LogEntry, err error) { var n int diff --git a/weed/filer/stream.go b/weed/filer/stream.go index b2ee00555..00539ca20 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -245,6 +245,7 @@ type ChunkStreamReader struct { var _ = io.ReadSeeker(&ChunkStreamReader{}) var _ = io.ReaderAt(&ChunkStreamReader{}) +var _ = io.Closer(&ChunkStreamReader{}) func doNewChunkStreamReader(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader { @@ -403,8 +404,13 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { return nil } -func (c *ChunkStreamReader) Close() { - // TODO try to release and reuse buffer +func (c *ChunkStreamReader) Close() error { + c.bufferLock.Lock() + defer c.bufferLock.Unlock() + c.buffer = nil + c.head = nil + c.chunkView = nil + return nil } func VolumeId(fileId string) string {