From 29bdbb3c487e6d924135ce1c73e5833a79332915 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 25 Mar 2026 15:43:25 -0700 Subject: [PATCH] filer.sync: replace O(n) conflict check with O(depth) index lookups (#8772) * filer.sync: replace O(n) conflict check with O(depth) index lookups The MetadataProcessor.conflictsWith() scanned all active jobs linearly for every new event dispatch. At high concurrency (256-1024), this O(n) scan under the activeJobsLock became a bottleneck that throttled the event dispatch pipeline, negating the benefit of higher -concurrency values. Replace the linear scan with three index maps: - activeFilePaths: O(1) exact file path lookup - activeDirPaths: O(1) directory path lookup per ancestor - descendantCount: O(1) check for active jobs under a directory Conflict check is now O(depth) where depth is the path depth (typically 3-6 levels), constant regardless of active job count. Benchmark confirms ~81ns per check whether there are 32 or 1024 active jobs. Also replace the O(n) watermark scan with minActiveTs tracking so non-oldest job completions are O(1). Ref: #8771 * filer.sync: replace O(n) watermark rescan with min-heap lazy deletion Address review feedback: - Replace minActiveTs O(n) rescan with a tsMinHeap using lazy deletion. Each TsNs is pushed once and popped once, giving O(log n) amortized watermark tracking regardless of completion order. - Fix benchmark to consume conflictsWith result via package-level sink variable to prevent compiler elision. The watermark advancement semantics (conservative, sets to completing job's TsNs) are unchanged from the original code. This is intentionally safe for idempotent replay on restart. --- weed/command/filer_sync_jobs.go | 234 +++++++++++----- weed/command/filer_sync_jobs_test.go | 387 +++++++++++++++++++++++++++ 2 files changed, 551 insertions(+), 70 deletions(-) create mode 100644 weed/command/filer_sync_jobs_test.go diff --git a/weed/command/filer_sync_jobs.go b/weed/command/filer_sync_jobs.go index 1c75eb0e1..b3902382b 100644 --- a/weed/command/filer_sync_jobs.go +++ b/weed/command/filer_sync_jobs.go @@ -1,6 +1,8 @@ package command import ( + "container/heap" + "path" "sync" "sync/atomic" @@ -10,26 +12,154 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) +// tsMinHeap implements heap.Interface for int64 timestamps. +type tsMinHeap []int64 + +func (h tsMinHeap) Len() int { return len(h) } +func (h tsMinHeap) Less(i, j int) bool { return h[i] < h[j] } +func (h tsMinHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *tsMinHeap) Push(x any) { *h = append(*h, x.(int64)) } +func (h *tsMinHeap) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[:n-1] + return x +} + +type syncJobPaths struct { + path util.FullPath + newPath util.FullPath // empty for non-renames + isDirectory bool +} + type MetadataProcessor struct { - activeJobs map[int64]*filer_pb.SubscribeMetadataResponse - activeJobsLock sync.Mutex - activeJobsCond *sync.Cond - concurrencyLimit int - fn pb.ProcessMetadataFunc + activeJobs map[int64]*syncJobPaths + activeJobsLock sync.Mutex + activeJobsCond *sync.Cond + concurrencyLimit int + fn pb.ProcessMetadataFunc processedTsWatermark atomic.Int64 + + // Indexes for O(depth) conflict detection, replacing O(n) linear scan. + // activeFilePaths counts active file jobs at each exact path. + activeFilePaths map[util.FullPath]int + // activeDirPaths counts active directory jobs at each exact path. + activeDirPaths map[util.FullPath]int + // descendantCount counts active jobs (file or dir) strictly under each directory. + descendantCount map[util.FullPath]int + + // tsHeap is a min-heap of active job timestamps with lazy deletion, + // used for O(log n) amortized watermark tracking. + tsHeap tsMinHeap } func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int, offsetTsNs int64) *MetadataProcessor { t := &MetadataProcessor{ fn: fn, - activeJobs: make(map[int64]*filer_pb.SubscribeMetadataResponse), + activeJobs: make(map[int64]*syncJobPaths), concurrencyLimit: concurrency, + activeFilePaths: make(map[util.FullPath]int), + activeDirPaths: make(map[util.FullPath]int), + descendantCount: make(map[util.FullPath]int), } t.processedTsWatermark.Store(offsetTsNs) t.activeJobsCond = sync.NewCond(&t.activeJobsLock) return t } +// pathAncestors returns all proper ancestor directories of p. +// For "/a/b/c", returns ["/a/b", "/a", "/"]. +func pathAncestors(p util.FullPath) []util.FullPath { + var ancestors []util.FullPath + s := string(p) + for { + parent := path.Dir(s) + if parent == s { + break + } + ancestors = append(ancestors, util.FullPath(parent)) + s = parent + } + return ancestors +} + +// addPathToIndex registers a path in the conflict detection indexes. +// Must be called under activeJobsLock. +func (t *MetadataProcessor) addPathToIndex(p util.FullPath, isDirectory bool) { + if isDirectory { + t.activeDirPaths[p]++ + } else { + t.activeFilePaths[p]++ + } + for _, ancestor := range pathAncestors(p) { + t.descendantCount[ancestor]++ + } +} + +// removePathFromIndex unregisters a path from the conflict detection indexes. +// Must be called under activeJobsLock. +func (t *MetadataProcessor) removePathFromIndex(p util.FullPath, isDirectory bool) { + if isDirectory { + if t.activeDirPaths[p] <= 1 { + delete(t.activeDirPaths, p) + } else { + t.activeDirPaths[p]-- + } + } else { + if t.activeFilePaths[p] <= 1 { + delete(t.activeFilePaths, p) + } else { + t.activeFilePaths[p]-- + } + } + for _, ancestor := range pathAncestors(p) { + if t.descendantCount[ancestor] <= 1 { + delete(t.descendantCount, ancestor) + } else { + t.descendantCount[ancestor]-- + } + } +} + +// pathConflicts checks if a single path conflicts with any active job. +// Conflict rules match pairShouldWaitFor: +// - file vs file: exact same path +// - file vs dir: file.IsUnder(dir) +// - dir vs file: file.IsUnder(dir) +// - dir vs dir: either IsUnder the other +func (t *MetadataProcessor) pathConflicts(p util.FullPath, isDirectory bool) bool { + if isDirectory { + // Any active job (file or dir) strictly under this directory? + if t.descendantCount[p] > 0 { + return true + } + } else { + // Exact same file already active? + if t.activeFilePaths[p] > 0 { + return true + } + } + // Any active directory that is a proper ancestor of p? + for _, ancestor := range pathAncestors(p) { + if t.activeDirPaths[ancestor] > 0 { + return true + } + } + return false +} + +func (t *MetadataProcessor) conflictsWith(resp *filer_pb.SubscribeMetadataResponse) bool { + p, newPath, isDirectory := extractPathsFromMetadata(resp) + if t.pathConflicts(p, isDirectory) { + return true + } + if newPath != "" && t.pathConflicts(newPath, isDirectory) { + return true + } + return false +} + func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) { if filer_pb.IsEmpty(resp) { return @@ -41,7 +171,18 @@ func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) for len(t.activeJobs) >= t.concurrencyLimit || t.conflictsWith(resp) { t.activeJobsCond.Wait() } - t.activeJobs[resp.TsNs] = resp + + p, newPath, isDirectory := extractPathsFromMetadata(resp) + jobPaths := &syncJobPaths{path: p, newPath: newPath, isDirectory: isDirectory} + + t.activeJobs[resp.TsNs] = jobPaths + t.addPathToIndex(p, isDirectory) + if newPath != "" { + t.addPathToIndex(newPath, isDirectory) + } + + heap.Push(&t.tsHeap, resp.TsNs) + go func() { if err := util.Retry("metadata processor", func() error { @@ -54,95 +195,48 @@ func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) defer t.activeJobsLock.Unlock() delete(t.activeJobs, resp.TsNs) + t.removePathFromIndex(jobPaths.path, jobPaths.isDirectory) + if jobPaths.newPath != "" { + t.removePathFromIndex(jobPaths.newPath, jobPaths.isDirectory) + } - // if is the oldest job, write down the watermark - isOldest := true - for t := range t.activeJobs { - if resp.TsNs > t { - isOldest = false + // Lazy-clean stale entries from heap top (already-completed jobs). + // Each entry is pushed once and popped once: O(log n) amortized. + for t.tsHeap.Len() > 0 { + if _, active := t.activeJobs[t.tsHeap[0]]; active { break } + heap.Pop(&t.tsHeap) } - if isOldest { + // If this was the oldest job, advance the watermark. + if t.tsHeap.Len() == 0 || resp.TsNs < t.tsHeap[0] { t.processedTsWatermark.Store(resp.TsNs) } t.activeJobsCond.Signal() }() } -func (t *MetadataProcessor) conflictsWith(resp *filer_pb.SubscribeMetadataResponse) bool { - for _, r := range t.activeJobs { - if shouldWaitFor(resp, r) { - return true - } - } - return false -} - -// a is one possible job to schedule -// b is one existing active job -func shouldWaitFor(a *filer_pb.SubscribeMetadataResponse, b *filer_pb.SubscribeMetadataResponse) bool { - aPath, aNewPath, aIsDirectory := extractPathsFromMetadata(a) - bPath, bNewPath, bIsDirectory := extractPathsFromMetadata(b) - - if pairShouldWaitFor(aPath, bPath, aIsDirectory, bIsDirectory) { - return true - } - if aNewPath != "" { - if pairShouldWaitFor(aNewPath, bPath, aIsDirectory, bIsDirectory) { - return true - } - } - if bNewPath != "" { - if pairShouldWaitFor(aPath, bNewPath, aIsDirectory, bIsDirectory) { - return true - } - } - if aNewPath != "" && bNewPath != "" { - if pairShouldWaitFor(aNewPath, bNewPath, aIsDirectory, bIsDirectory) { - return true - } - } - return false -} - -func pairShouldWaitFor(aPath, bPath util.FullPath, aIsDirectory, bIsDirectory bool) bool { - if bIsDirectory { - if aIsDirectory { - return aPath.IsUnder(bPath) || bPath.IsUnder(aPath) - } else { - return aPath.IsUnder(bPath) - } - } else { - if aIsDirectory { - return bPath.IsUnder(aPath) - } else { - return aPath == bPath - } - } -} - -func extractPathsFromMetadata(resp *filer_pb.SubscribeMetadataResponse) (path, newPath util.FullPath, isDirectory bool) { +func extractPathsFromMetadata(resp *filer_pb.SubscribeMetadataResponse) (p, newPath util.FullPath, isDirectory bool) { oldEntry := resp.EventNotification.OldEntry newEntry := resp.EventNotification.NewEntry // create if filer_pb.IsCreate(resp) { - path = util.FullPath(resp.Directory).Child(newEntry.Name) + p = util.FullPath(resp.Directory).Child(newEntry.Name) isDirectory = newEntry.IsDirectory return } if filer_pb.IsDelete(resp) { - path = util.FullPath(resp.Directory).Child(oldEntry.Name) + p = util.FullPath(resp.Directory).Child(oldEntry.Name) isDirectory = oldEntry.IsDirectory return } if filer_pb.IsUpdate(resp) { - path = util.FullPath(resp.Directory).Child(newEntry.Name) + p = util.FullPath(resp.Directory).Child(newEntry.Name) isDirectory = newEntry.IsDirectory return } // renaming - path = util.FullPath(resp.Directory).Child(oldEntry.Name) + p = util.FullPath(resp.Directory).Child(oldEntry.Name) isDirectory = oldEntry.IsDirectory newPath = util.FullPath(resp.EventNotification.NewParentPath).Child(newEntry.Name) return diff --git a/weed/command/filer_sync_jobs_test.go b/weed/command/filer_sync_jobs_test.go new file mode 100644 index 000000000..207712d92 --- /dev/null +++ b/weed/command/filer_sync_jobs_test.go @@ -0,0 +1,387 @@ +package command + +import ( + "container/heap" + "fmt" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +func makeResp(dir, name string, isDir bool, tsNs int64, isNew bool) *filer_pb.SubscribeMetadataResponse { + resp := &filer_pb.SubscribeMetadataResponse{ + Directory: dir, + TsNs: tsNs, + EventNotification: &filer_pb.EventNotification{}, + } + entry := &filer_pb.Entry{ + Name: name, + IsDirectory: isDir, + } + if isNew { + resp.EventNotification.NewEntry = entry + } else { + resp.EventNotification.OldEntry = entry + } + return resp +} + +func makeRenameResp(oldDir, oldName, newDir, newName string, isDir bool, tsNs int64) *filer_pb.SubscribeMetadataResponse { + return &filer_pb.SubscribeMetadataResponse{ + Directory: oldDir, + TsNs: tsNs, + EventNotification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{ + Name: oldName, + IsDirectory: isDir, + }, + NewEntry: &filer_pb.Entry{ + Name: newName, + IsDirectory: isDir, + }, + NewParentPath: newDir, + }, + } +} + +func TestPathAncestors(t *testing.T) { + tests := []struct { + path util.FullPath + expected []util.FullPath + }{ + {"/a/b/c/file.txt", []util.FullPath{"/a/b/c", "/a/b", "/a", "/"}}, + {"/a/b", []util.FullPath{"/a", "/"}}, + {"/a", []util.FullPath{"/"}}, + {"/", nil}, + } + for _, tt := range tests { + got := pathAncestors(tt.path) + if len(got) != len(tt.expected) { + t.Errorf("pathAncestors(%q) = %v, want %v", tt.path, got, tt.expected) + continue + } + for i := range got { + if got[i] != tt.expected[i] { + t.Errorf("pathAncestors(%q)[%d] = %q, want %q", tt.path, i, got[i], tt.expected[i]) + } + } + } +} + +// TestFileVsFileConflict verifies that two file operations on the same path conflict, +// and on different paths do not. +func TestFileVsFileConflict(t *testing.T) { + noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil } + p := NewMetadataProcessor(noop, 100, 0) + + // Add a file job + active := makeResp("/dir1", "file.txt", false, 1, true) + path, newPath, isDir := extractPathsFromMetadata(active) + p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} + p.addPathToIndex(path, isDir) + + // Same file should conflict + same := makeResp("/dir1", "file.txt", false, 2, true) + if !p.conflictsWith(same) { + t.Error("expected conflict for same file path") + } + + // Different file should not conflict + diff := makeResp("/dir1", "other.txt", false, 3, true) + if p.conflictsWith(diff) { + t.Error("unexpected conflict for different file path") + } + + // File in different directory should not conflict + diffDir := makeResp("/dir2", "file.txt", false, 4, true) + if p.conflictsWith(diffDir) { + t.Error("unexpected conflict for file in different directory") + } +} + +// TestFileUnderActiveDirConflict verifies that a file under an active directory operation +// conflicts, but a file outside does not. +func TestFileUnderActiveDirConflict(t *testing.T) { + noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil } + p := NewMetadataProcessor(noop, 100, 0) + + // Add a directory job at /dir1 + active := makeResp("/", "dir1", true, 1, true) + path, newPath, isDir := extractPathsFromMetadata(active) + p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} + p.addPathToIndex(path, isDir) + + // File under /dir1 should conflict + under := makeResp("/dir1", "file.txt", false, 2, true) + if !p.conflictsWith(under) { + t.Error("expected conflict for file under active directory") + } + + // File deeply nested under /dir1 should conflict + deep := makeResp("/dir1/sub/deep", "file.txt", false, 3, true) + if !p.conflictsWith(deep) { + t.Error("expected conflict for deeply nested file under active directory") + } + + // File in /dir2 should not conflict + outside := makeResp("/dir2", "file.txt", false, 4, true) + if p.conflictsWith(outside) { + t.Error("unexpected conflict for file outside active directory") + } + + // File at /dir1 itself (not under, at) should not conflict + // because IsUnder is strict: "/dir1".IsUnder("/dir1") == false + atSame := makeResp("/", "dir1", false, 5, true) + if p.conflictsWith(atSame) { + t.Error("unexpected conflict for file at same path as directory (IsUnder is strict)") + } +} + +// TestDirWithActiveFileUnder verifies that a directory operation conflicts when +// there are active file jobs under it. +func TestDirWithActiveFileUnder(t *testing.T) { + noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil } + p := NewMetadataProcessor(noop, 100, 0) + + // Add file jobs under /dir1 + f1 := makeResp("/dir1/sub", "file.txt", false, 1, true) + path, newPath, isDir := extractPathsFromMetadata(f1) + p.activeJobs[f1.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} + p.addPathToIndex(path, isDir) + + // Directory /dir1 should conflict (has active file under it) + dirOp := makeResp("/", "dir1", true, 2, true) + if !p.conflictsWith(dirOp) { + t.Error("expected conflict for directory with active file under it") + } + + // Directory /dir2 should not conflict + dirOp2 := makeResp("/", "dir2", true, 3, true) + if p.conflictsWith(dirOp2) { + t.Error("unexpected conflict for directory with no active jobs under it") + } +} + +// TestDirVsDirConflict verifies ancestor/descendant directory conflict detection. +func TestDirVsDirConflict(t *testing.T) { + noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil } + p := NewMetadataProcessor(noop, 100, 0) + + // Add directory job at /a/b + active := makeResp("/a", "b", true, 1, true) + path, newPath, isDir := extractPathsFromMetadata(active) + p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} + p.addPathToIndex(path, isDir) + + // /a/b/c (descendant) should conflict + desc := makeResp("/a/b", "c", true, 2, true) + if !p.conflictsWith(desc) { + t.Error("expected conflict for descendant directory") + } + + // /a (ancestor) should conflict + anc := makeResp("/", "a", true, 3, true) + if !p.conflictsWith(anc) { + t.Error("expected conflict for ancestor directory") + } + + // Same directory should NOT conflict (IsUnder is strict, not equal) + same := makeResp("/a", "b", true, 4, true) + if p.conflictsWith(same) { + t.Error("unexpected conflict for same directory (IsUnder is strict)") + } + + // Sibling directory should not conflict + sibling := makeResp("/a", "c", true, 5, true) + if p.conflictsWith(sibling) { + t.Error("unexpected conflict for sibling directory") + } +} + +// TestRenameConflict verifies that rename events with two paths check both paths. +func TestRenameConflict(t *testing.T) { + noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil } + p := NewMetadataProcessor(noop, 100, 0) + + // Add file job at /dir1/file.txt + f1 := makeResp("/dir1", "file.txt", false, 1, true) + path, newPath, isDir := extractPathsFromMetadata(f1) + p.activeJobs[f1.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} + p.addPathToIndex(path, isDir) + + // Rename from /dir2/a.txt to /dir1/file.txt should conflict (newPath matches) + rename := makeRenameResp("/dir2", "a.txt", "/dir1", "file.txt", false, 2) + if !p.conflictsWith(rename) { + t.Error("expected conflict for rename whose destination matches active file") + } + + // Rename from /dir1/file.txt to /dir2/b.txt should conflict (oldPath matches) + rename2 := makeRenameResp("/dir1", "file.txt", "/dir2", "b.txt", false, 3) + if !p.conflictsWith(rename2) { + t.Error("expected conflict for rename whose source matches active file") + } + + // Rename between unrelated paths should not conflict + rename3 := makeRenameResp("/dir3", "x.txt", "/dir4", "y.txt", false, 4) + if p.conflictsWith(rename3) { + t.Error("unexpected conflict for rename between unrelated paths") + } +} + +// TestActiveRenameConflict verifies that an active rename job registers both paths. +func TestActiveRenameConflict(t *testing.T) { + noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil } + p := NewMetadataProcessor(noop, 100, 0) + + // Add active rename job: /dir1/old.txt -> /dir2/new.txt + rename := makeRenameResp("/dir1", "old.txt", "/dir2", "new.txt", false, 1) + path, newPath, isDir := extractPathsFromMetadata(rename) + p.activeJobs[rename.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} + p.addPathToIndex(path, isDir) + if newPath != "" { + p.addPathToIndex(newPath, isDir) + } + + // File at /dir1/old.txt should conflict + f1 := makeResp("/dir1", "old.txt", false, 2, true) + if !p.conflictsWith(f1) { + t.Error("expected conflict at rename source path") + } + + // File at /dir2/new.txt should conflict + f2 := makeResp("/dir2", "new.txt", false, 3, true) + if !p.conflictsWith(f2) { + t.Error("expected conflict at rename destination path") + } + + // File at unrelated path should not conflict + f3 := makeResp("/dir3", "other.txt", false, 4, true) + if p.conflictsWith(f3) { + t.Error("unexpected conflict at unrelated path") + } +} + +// TestRootDirConflict verifies that an active job at / conflicts with everything. +func TestRootDirConflict(t *testing.T) { + noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil } + p := NewMetadataProcessor(noop, 100, 0) + + // Add directory job at / + // Note: a dir entry at "/" would be created as FullPath("/").Child("somedir") + // But let's test what happens with an active dir at /some/path and check root + active := makeResp("/some", "dir", true, 1, true) + path, newPath, isDir := extractPathsFromMetadata(active) + p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} + p.addPathToIndex(path, isDir) + + // Root dir should conflict because active dir /some/dir is under / + // A new directory at "/" should see descendantCount["/"] > 0 + if p.descendantCount["/"] <= 0 { + t.Error("expected descendantCount['/'] > 0 for active job under root") + } +} + +// TestIndexCleanup verifies that removing a job properly cleans up all indexes. +func TestIndexCleanup(t *testing.T) { + noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil } + p := NewMetadataProcessor(noop, 100, 0) + + // Add then remove a file job + path := util.FullPath("/a/b/c/file.txt") + p.addPathToIndex(path, false) + + if p.activeFilePaths[path] != 1 { + t.Errorf("expected activeFilePaths count 1, got %d", p.activeFilePaths[path]) + } + if p.descendantCount["/a/b/c"] != 1 { + t.Errorf("expected descendantCount['/a/b/c'] = 1, got %d", p.descendantCount["/a/b/c"]) + } + + p.removePathFromIndex(path, false) + + if len(p.activeFilePaths) != 0 { + t.Errorf("expected empty activeFilePaths after removal, got %v", p.activeFilePaths) + } + if len(p.descendantCount) != 0 { + t.Errorf("expected empty descendantCount after removal, got %v", p.descendantCount) + } +} + +// TestWatermarkWithHeap verifies watermark advancement using the min-heap. +func TestWatermarkWithHeap(t *testing.T) { + noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil } + p := NewMetadataProcessor(noop, 100, 0) + + // Simulate adding jobs in order + for _, ts := range []int64{10, 20, 30} { + jobPath := util.FullPath("/file" + string(rune('0'+ts/10))) + p.activeJobs[ts] = &syncJobPaths{path: jobPath, isDirectory: false} + p.addPathToIndex(jobPath, false) + heap.Push(&p.tsHeap, ts) + } + + if p.tsHeap[0] != 10 { + t.Errorf("expected heap min=10, got %d", p.tsHeap[0]) + } + + // Remove non-oldest (ts=20) — heap top should stay 10 + delete(p.activeJobs, 20) + p.removePathFromIndex("/file2", false) + // Lazy clean: top is 10 which is still active, so no pop + for p.tsHeap.Len() > 0 { + if _, active := p.activeJobs[p.tsHeap[0]]; active { + break + } + heap.Pop(&p.tsHeap) + } + if p.tsHeap[0] != 10 { + t.Errorf("expected heap min=10 after removing 20, got %d", p.tsHeap[0]) + } + + // Remove oldest (ts=10) — lazy clean should find 30 + delete(p.activeJobs, 10) + p.removePathFromIndex("/file1", false) + for p.tsHeap.Len() > 0 { + if _, active := p.activeJobs[p.tsHeap[0]]; active { + break + } + heap.Pop(&p.tsHeap) + } + if p.tsHeap.Len() != 1 || p.tsHeap[0] != 30 { + t.Errorf("expected heap min=30 after removing 10 and 20, got len=%d", p.tsHeap.Len()) + } +} + +// benchResult prevents the compiler from optimizing away the conflict check. +var benchResult bool + +// BenchmarkConflictCheck measures conflict check cost with varying active job counts. +// With the index-based approach, cost should be O(depth) regardless of job count. +func BenchmarkConflictCheck(b *testing.B) { + for _, numJobs := range []int{32, 256, 1024} { + b.Run(fmt.Sprintf("jobs=%d", numJobs), func(b *testing.B) { + noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil } + p := NewMetadataProcessor(noop, numJobs+1, 0) + + // Fill with active jobs in different directories + for i := range numJobs { + dir := fmt.Sprintf("/dir%d/sub%d", i/100, i%100) + name := fmt.Sprintf("file%d.txt", i) + resp := makeResp(dir, name, false, int64(i+1), true) + path, newPath, isDir := extractPathsFromMetadata(resp) + p.activeJobs[resp.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} + p.addPathToIndex(path, isDir) + } + + // Benchmark conflict check for a non-conflicting event + probe := makeResp("/other/path", "test.txt", false, int64(numJobs+1), true) + var r bool + b.ResetTimer() + for i := 0; i < b.N; i++ { + r = p.conflictsWith(probe) + } + benchResult = r + }) + } +}