diff --git a/weed/command/filer_sync_jobs.go b/weed/command/filer_sync_jobs.go index 1c75eb0e1..b3902382b 100644 --- a/weed/command/filer_sync_jobs.go +++ b/weed/command/filer_sync_jobs.go @@ -1,6 +1,8 @@ package command import ( + "container/heap" + "path" "sync" "sync/atomic" @@ -10,26 +12,154 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) +// tsMinHeap implements heap.Interface for int64 timestamps. +type tsMinHeap []int64 + +func (h tsMinHeap) Len() int { return len(h) } +func (h tsMinHeap) Less(i, j int) bool { return h[i] < h[j] } +func (h tsMinHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *tsMinHeap) Push(x any) { *h = append(*h, x.(int64)) } +func (h *tsMinHeap) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[:n-1] + return x +} + +type syncJobPaths struct { + path util.FullPath + newPath util.FullPath // empty for non-renames + isDirectory bool +} + type MetadataProcessor struct { - activeJobs map[int64]*filer_pb.SubscribeMetadataResponse - activeJobsLock sync.Mutex - activeJobsCond *sync.Cond - concurrencyLimit int - fn pb.ProcessMetadataFunc + activeJobs map[int64]*syncJobPaths + activeJobsLock sync.Mutex + activeJobsCond *sync.Cond + concurrencyLimit int + fn pb.ProcessMetadataFunc processedTsWatermark atomic.Int64 + + // Indexes for O(depth) conflict detection, replacing O(n) linear scan. + // activeFilePaths counts active file jobs at each exact path. + activeFilePaths map[util.FullPath]int + // activeDirPaths counts active directory jobs at each exact path. + activeDirPaths map[util.FullPath]int + // descendantCount counts active jobs (file or dir) strictly under each directory. + descendantCount map[util.FullPath]int + + // tsHeap is a min-heap of active job timestamps with lazy deletion, + // used for O(log n) amortized watermark tracking. + tsHeap tsMinHeap } func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int, offsetTsNs int64) *MetadataProcessor { t := &MetadataProcessor{ fn: fn, - activeJobs: make(map[int64]*filer_pb.SubscribeMetadataResponse), + activeJobs: make(map[int64]*syncJobPaths), concurrencyLimit: concurrency, + activeFilePaths: make(map[util.FullPath]int), + activeDirPaths: make(map[util.FullPath]int), + descendantCount: make(map[util.FullPath]int), } t.processedTsWatermark.Store(offsetTsNs) t.activeJobsCond = sync.NewCond(&t.activeJobsLock) return t } +// pathAncestors returns all proper ancestor directories of p. +// For "/a/b/c", returns ["/a/b", "/a", "/"]. +func pathAncestors(p util.FullPath) []util.FullPath { + var ancestors []util.FullPath + s := string(p) + for { + parent := path.Dir(s) + if parent == s { + break + } + ancestors = append(ancestors, util.FullPath(parent)) + s = parent + } + return ancestors +} + +// addPathToIndex registers a path in the conflict detection indexes. +// Must be called under activeJobsLock. +func (t *MetadataProcessor) addPathToIndex(p util.FullPath, isDirectory bool) { + if isDirectory { + t.activeDirPaths[p]++ + } else { + t.activeFilePaths[p]++ + } + for _, ancestor := range pathAncestors(p) { + t.descendantCount[ancestor]++ + } +} + +// removePathFromIndex unregisters a path from the conflict detection indexes. +// Must be called under activeJobsLock. +func (t *MetadataProcessor) removePathFromIndex(p util.FullPath, isDirectory bool) { + if isDirectory { + if t.activeDirPaths[p] <= 1 { + delete(t.activeDirPaths, p) + } else { + t.activeDirPaths[p]-- + } + } else { + if t.activeFilePaths[p] <= 1 { + delete(t.activeFilePaths, p) + } else { + t.activeFilePaths[p]-- + } + } + for _, ancestor := range pathAncestors(p) { + if t.descendantCount[ancestor] <= 1 { + delete(t.descendantCount, ancestor) + } else { + t.descendantCount[ancestor]-- + } + } +} + +// pathConflicts checks if a single path conflicts with any active job. +// Conflict rules match pairShouldWaitFor: +// - file vs file: exact same path +// - file vs dir: file.IsUnder(dir) +// - dir vs file: file.IsUnder(dir) +// - dir vs dir: either IsUnder the other +func (t *MetadataProcessor) pathConflicts(p util.FullPath, isDirectory bool) bool { + if isDirectory { + // Any active job (file or dir) strictly under this directory? + if t.descendantCount[p] > 0 { + return true + } + } else { + // Exact same file already active? + if t.activeFilePaths[p] > 0 { + return true + } + } + // Any active directory that is a proper ancestor of p? + for _, ancestor := range pathAncestors(p) { + if t.activeDirPaths[ancestor] > 0 { + return true + } + } + return false +} + +func (t *MetadataProcessor) conflictsWith(resp *filer_pb.SubscribeMetadataResponse) bool { + p, newPath, isDirectory := extractPathsFromMetadata(resp) + if t.pathConflicts(p, isDirectory) { + return true + } + if newPath != "" && t.pathConflicts(newPath, isDirectory) { + return true + } + return false +} + func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) { if filer_pb.IsEmpty(resp) { return @@ -41,7 +171,18 @@ func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) for len(t.activeJobs) >= t.concurrencyLimit || t.conflictsWith(resp) { t.activeJobsCond.Wait() } - t.activeJobs[resp.TsNs] = resp + + p, newPath, isDirectory := extractPathsFromMetadata(resp) + jobPaths := &syncJobPaths{path: p, newPath: newPath, isDirectory: isDirectory} + + t.activeJobs[resp.TsNs] = jobPaths + t.addPathToIndex(p, isDirectory) + if newPath != "" { + t.addPathToIndex(newPath, isDirectory) + } + + heap.Push(&t.tsHeap, resp.TsNs) + go func() { if err := util.Retry("metadata processor", func() error { @@ -54,95 +195,48 @@ func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) defer t.activeJobsLock.Unlock() delete(t.activeJobs, resp.TsNs) + t.removePathFromIndex(jobPaths.path, jobPaths.isDirectory) + if jobPaths.newPath != "" { + t.removePathFromIndex(jobPaths.newPath, jobPaths.isDirectory) + } - // if is the oldest job, write down the watermark - isOldest := true - for t := range t.activeJobs { - if resp.TsNs > t { - isOldest = false + // Lazy-clean stale entries from heap top (already-completed jobs). + // Each entry is pushed once and popped once: O(log n) amortized. + for t.tsHeap.Len() > 0 { + if _, active := t.activeJobs[t.tsHeap[0]]; active { break } + heap.Pop(&t.tsHeap) } - if isOldest { + // If this was the oldest job, advance the watermark. + if t.tsHeap.Len() == 0 || resp.TsNs < t.tsHeap[0] { t.processedTsWatermark.Store(resp.TsNs) } t.activeJobsCond.Signal() }() } -func (t *MetadataProcessor) conflictsWith(resp *filer_pb.SubscribeMetadataResponse) bool { - for _, r := range t.activeJobs { - if shouldWaitFor(resp, r) { - return true - } - } - return false -} - -// a is one possible job to schedule -// b is one existing active job -func shouldWaitFor(a *filer_pb.SubscribeMetadataResponse, b *filer_pb.SubscribeMetadataResponse) bool { - aPath, aNewPath, aIsDirectory := extractPathsFromMetadata(a) - bPath, bNewPath, bIsDirectory := extractPathsFromMetadata(b) - - if pairShouldWaitFor(aPath, bPath, aIsDirectory, bIsDirectory) { - return true - } - if aNewPath != "" { - if pairShouldWaitFor(aNewPath, bPath, aIsDirectory, bIsDirectory) { - return true - } - } - if bNewPath != "" { - if pairShouldWaitFor(aPath, bNewPath, aIsDirectory, bIsDirectory) { - return true - } - } - if aNewPath != "" && bNewPath != "" { - if pairShouldWaitFor(aNewPath, bNewPath, aIsDirectory, bIsDirectory) { - return true - } - } - return false -} - -func pairShouldWaitFor(aPath, bPath util.FullPath, aIsDirectory, bIsDirectory bool) bool { - if bIsDirectory { - if aIsDirectory { - return aPath.IsUnder(bPath) || bPath.IsUnder(aPath) - } else { - return aPath.IsUnder(bPath) - } - } else { - if aIsDirectory { - return bPath.IsUnder(aPath) - } else { - return aPath == bPath - } - } -} - -func extractPathsFromMetadata(resp *filer_pb.SubscribeMetadataResponse) (path, newPath util.FullPath, isDirectory bool) { +func extractPathsFromMetadata(resp *filer_pb.SubscribeMetadataResponse) (p, newPath util.FullPath, isDirectory bool) { oldEntry := resp.EventNotification.OldEntry newEntry := resp.EventNotification.NewEntry // create if filer_pb.IsCreate(resp) { - path = util.FullPath(resp.Directory).Child(newEntry.Name) + p = util.FullPath(resp.Directory).Child(newEntry.Name) isDirectory = newEntry.IsDirectory return } if filer_pb.IsDelete(resp) { - path = util.FullPath(resp.Directory).Child(oldEntry.Name) + p = util.FullPath(resp.Directory).Child(oldEntry.Name) isDirectory = oldEntry.IsDirectory return } if filer_pb.IsUpdate(resp) { - path = util.FullPath(resp.Directory).Child(newEntry.Name) + p = util.FullPath(resp.Directory).Child(newEntry.Name) isDirectory = newEntry.IsDirectory return } // renaming - path = util.FullPath(resp.Directory).Child(oldEntry.Name) + p = util.FullPath(resp.Directory).Child(oldEntry.Name) isDirectory = oldEntry.IsDirectory newPath = util.FullPath(resp.EventNotification.NewParentPath).Child(newEntry.Name) return diff --git a/weed/command/filer_sync_jobs_test.go b/weed/command/filer_sync_jobs_test.go new file mode 100644 index 000000000..207712d92 --- /dev/null +++ b/weed/command/filer_sync_jobs_test.go @@ -0,0 +1,387 @@ +package command + +import ( + "container/heap" + "fmt" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +func makeResp(dir, name string, isDir bool, tsNs int64, isNew bool) *filer_pb.SubscribeMetadataResponse { + resp := &filer_pb.SubscribeMetadataResponse{ + Directory: dir, + TsNs: tsNs, + EventNotification: &filer_pb.EventNotification{}, + } + entry := &filer_pb.Entry{ + Name: name, + IsDirectory: isDir, + } + if isNew { + resp.EventNotification.NewEntry = entry + } else { + resp.EventNotification.OldEntry = entry + } + return resp +} + +func makeRenameResp(oldDir, oldName, newDir, newName string, isDir bool, tsNs int64) *filer_pb.SubscribeMetadataResponse { + return &filer_pb.SubscribeMetadataResponse{ + Directory: oldDir, + TsNs: tsNs, + EventNotification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{ + Name: oldName, + IsDirectory: isDir, + }, + NewEntry: &filer_pb.Entry{ + Name: newName, + IsDirectory: isDir, + }, + NewParentPath: newDir, + }, + } +} + +func TestPathAncestors(t *testing.T) { + tests := []struct { + path util.FullPath + expected []util.FullPath + }{ + {"/a/b/c/file.txt", []util.FullPath{"/a/b/c", "/a/b", "/a", "/"}}, + {"/a/b", []util.FullPath{"/a", "/"}}, + {"/a", []util.FullPath{"/"}}, + {"/", nil}, + } + for _, tt := range tests { + got := pathAncestors(tt.path) + if len(got) != len(tt.expected) { + t.Errorf("pathAncestors(%q) = %v, want %v", tt.path, got, tt.expected) + continue + } + for i := range got { + if got[i] != tt.expected[i] { + t.Errorf("pathAncestors(%q)[%d] = %q, want %q", tt.path, i, got[i], tt.expected[i]) + } + } + } +} + +// TestFileVsFileConflict verifies that two file operations on the same path conflict, +// and on different paths do not. +func TestFileVsFileConflict(t *testing.T) { + noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil } + p := NewMetadataProcessor(noop, 100, 0) + + // Add a file job + active := makeResp("/dir1", "file.txt", false, 1, true) + path, newPath, isDir := extractPathsFromMetadata(active) + p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} + p.addPathToIndex(path, isDir) + + // Same file should conflict + same := makeResp("/dir1", "file.txt", false, 2, true) + if !p.conflictsWith(same) { + t.Error("expected conflict for same file path") + } + + // Different file should not conflict + diff := makeResp("/dir1", "other.txt", false, 3, true) + if p.conflictsWith(diff) { + t.Error("unexpected conflict for different file path") + } + + // File in different directory should not conflict + diffDir := makeResp("/dir2", "file.txt", false, 4, true) + if p.conflictsWith(diffDir) { + t.Error("unexpected conflict for file in different directory") + } +} + +// TestFileUnderActiveDirConflict verifies that a file under an active directory operation +// conflicts, but a file outside does not. +func TestFileUnderActiveDirConflict(t *testing.T) { + noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil } + p := NewMetadataProcessor(noop, 100, 0) + + // Add a directory job at /dir1 + active := makeResp("/", "dir1", true, 1, true) + path, newPath, isDir := extractPathsFromMetadata(active) + p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} + p.addPathToIndex(path, isDir) + + // File under /dir1 should conflict + under := makeResp("/dir1", "file.txt", false, 2, true) + if !p.conflictsWith(under) { + t.Error("expected conflict for file under active directory") + } + + // File deeply nested under /dir1 should conflict + deep := makeResp("/dir1/sub/deep", "file.txt", false, 3, true) + if !p.conflictsWith(deep) { + t.Error("expected conflict for deeply nested file under active directory") + } + + // File in /dir2 should not conflict + outside := makeResp("/dir2", "file.txt", false, 4, true) + if p.conflictsWith(outside) { + t.Error("unexpected conflict for file outside active directory") + } + + // File at /dir1 itself (not under, at) should not conflict + // because IsUnder is strict: "/dir1".IsUnder("/dir1") == false + atSame := makeResp("/", "dir1", false, 5, true) + if p.conflictsWith(atSame) { + t.Error("unexpected conflict for file at same path as directory (IsUnder is strict)") + } +} + +// TestDirWithActiveFileUnder verifies that a directory operation conflicts when +// there are active file jobs under it. +func TestDirWithActiveFileUnder(t *testing.T) { + noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil } + p := NewMetadataProcessor(noop, 100, 0) + + // Add file jobs under /dir1 + f1 := makeResp("/dir1/sub", "file.txt", false, 1, true) + path, newPath, isDir := extractPathsFromMetadata(f1) + p.activeJobs[f1.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} + p.addPathToIndex(path, isDir) + + // Directory /dir1 should conflict (has active file under it) + dirOp := makeResp("/", "dir1", true, 2, true) + if !p.conflictsWith(dirOp) { + t.Error("expected conflict for directory with active file under it") + } + + // Directory /dir2 should not conflict + dirOp2 := makeResp("/", "dir2", true, 3, true) + if p.conflictsWith(dirOp2) { + t.Error("unexpected conflict for directory with no active jobs under it") + } +} + +// TestDirVsDirConflict verifies ancestor/descendant directory conflict detection. +func TestDirVsDirConflict(t *testing.T) { + noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil } + p := NewMetadataProcessor(noop, 100, 0) + + // Add directory job at /a/b + active := makeResp("/a", "b", true, 1, true) + path, newPath, isDir := extractPathsFromMetadata(active) + p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} + p.addPathToIndex(path, isDir) + + // /a/b/c (descendant) should conflict + desc := makeResp("/a/b", "c", true, 2, true) + if !p.conflictsWith(desc) { + t.Error("expected conflict for descendant directory") + } + + // /a (ancestor) should conflict + anc := makeResp("/", "a", true, 3, true) + if !p.conflictsWith(anc) { + t.Error("expected conflict for ancestor directory") + } + + // Same directory should NOT conflict (IsUnder is strict, not equal) + same := makeResp("/a", "b", true, 4, true) + if p.conflictsWith(same) { + t.Error("unexpected conflict for same directory (IsUnder is strict)") + } + + // Sibling directory should not conflict + sibling := makeResp("/a", "c", true, 5, true) + if p.conflictsWith(sibling) { + t.Error("unexpected conflict for sibling directory") + } +} + +// TestRenameConflict verifies that rename events with two paths check both paths. +func TestRenameConflict(t *testing.T) { + noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil } + p := NewMetadataProcessor(noop, 100, 0) + + // Add file job at /dir1/file.txt + f1 := makeResp("/dir1", "file.txt", false, 1, true) + path, newPath, isDir := extractPathsFromMetadata(f1) + p.activeJobs[f1.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} + p.addPathToIndex(path, isDir) + + // Rename from /dir2/a.txt to /dir1/file.txt should conflict (newPath matches) + rename := makeRenameResp("/dir2", "a.txt", "/dir1", "file.txt", false, 2) + if !p.conflictsWith(rename) { + t.Error("expected conflict for rename whose destination matches active file") + } + + // Rename from /dir1/file.txt to /dir2/b.txt should conflict (oldPath matches) + rename2 := makeRenameResp("/dir1", "file.txt", "/dir2", "b.txt", false, 3) + if !p.conflictsWith(rename2) { + t.Error("expected conflict for rename whose source matches active file") + } + + // Rename between unrelated paths should not conflict + rename3 := makeRenameResp("/dir3", "x.txt", "/dir4", "y.txt", false, 4) + if p.conflictsWith(rename3) { + t.Error("unexpected conflict for rename between unrelated paths") + } +} + +// TestActiveRenameConflict verifies that an active rename job registers both paths. +func TestActiveRenameConflict(t *testing.T) { + noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil } + p := NewMetadataProcessor(noop, 100, 0) + + // Add active rename job: /dir1/old.txt -> /dir2/new.txt + rename := makeRenameResp("/dir1", "old.txt", "/dir2", "new.txt", false, 1) + path, newPath, isDir := extractPathsFromMetadata(rename) + p.activeJobs[rename.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} + p.addPathToIndex(path, isDir) + if newPath != "" { + p.addPathToIndex(newPath, isDir) + } + + // File at /dir1/old.txt should conflict + f1 := makeResp("/dir1", "old.txt", false, 2, true) + if !p.conflictsWith(f1) { + t.Error("expected conflict at rename source path") + } + + // File at /dir2/new.txt should conflict + f2 := makeResp("/dir2", "new.txt", false, 3, true) + if !p.conflictsWith(f2) { + t.Error("expected conflict at rename destination path") + } + + // File at unrelated path should not conflict + f3 := makeResp("/dir3", "other.txt", false, 4, true) + if p.conflictsWith(f3) { + t.Error("unexpected conflict at unrelated path") + } +} + +// TestRootDirConflict verifies that an active job at / conflicts with everything. +func TestRootDirConflict(t *testing.T) { + noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil } + p := NewMetadataProcessor(noop, 100, 0) + + // Add directory job at / + // Note: a dir entry at "/" would be created as FullPath("/").Child("somedir") + // But let's test what happens with an active dir at /some/path and check root + active := makeResp("/some", "dir", true, 1, true) + path, newPath, isDir := extractPathsFromMetadata(active) + p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} + p.addPathToIndex(path, isDir) + + // Root dir should conflict because active dir /some/dir is under / + // A new directory at "/" should see descendantCount["/"] > 0 + if p.descendantCount["/"] <= 0 { + t.Error("expected descendantCount['/'] > 0 for active job under root") + } +} + +// TestIndexCleanup verifies that removing a job properly cleans up all indexes. +func TestIndexCleanup(t *testing.T) { + noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil } + p := NewMetadataProcessor(noop, 100, 0) + + // Add then remove a file job + path := util.FullPath("/a/b/c/file.txt") + p.addPathToIndex(path, false) + + if p.activeFilePaths[path] != 1 { + t.Errorf("expected activeFilePaths count 1, got %d", p.activeFilePaths[path]) + } + if p.descendantCount["/a/b/c"] != 1 { + t.Errorf("expected descendantCount['/a/b/c'] = 1, got %d", p.descendantCount["/a/b/c"]) + } + + p.removePathFromIndex(path, false) + + if len(p.activeFilePaths) != 0 { + t.Errorf("expected empty activeFilePaths after removal, got %v", p.activeFilePaths) + } + if len(p.descendantCount) != 0 { + t.Errorf("expected empty descendantCount after removal, got %v", p.descendantCount) + } +} + +// TestWatermarkWithHeap verifies watermark advancement using the min-heap. +func TestWatermarkWithHeap(t *testing.T) { + noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil } + p := NewMetadataProcessor(noop, 100, 0) + + // Simulate adding jobs in order + for _, ts := range []int64{10, 20, 30} { + jobPath := util.FullPath("/file" + string(rune('0'+ts/10))) + p.activeJobs[ts] = &syncJobPaths{path: jobPath, isDirectory: false} + p.addPathToIndex(jobPath, false) + heap.Push(&p.tsHeap, ts) + } + + if p.tsHeap[0] != 10 { + t.Errorf("expected heap min=10, got %d", p.tsHeap[0]) + } + + // Remove non-oldest (ts=20) — heap top should stay 10 + delete(p.activeJobs, 20) + p.removePathFromIndex("/file2", false) + // Lazy clean: top is 10 which is still active, so no pop + for p.tsHeap.Len() > 0 { + if _, active := p.activeJobs[p.tsHeap[0]]; active { + break + } + heap.Pop(&p.tsHeap) + } + if p.tsHeap[0] != 10 { + t.Errorf("expected heap min=10 after removing 20, got %d", p.tsHeap[0]) + } + + // Remove oldest (ts=10) — lazy clean should find 30 + delete(p.activeJobs, 10) + p.removePathFromIndex("/file1", false) + for p.tsHeap.Len() > 0 { + if _, active := p.activeJobs[p.tsHeap[0]]; active { + break + } + heap.Pop(&p.tsHeap) + } + if p.tsHeap.Len() != 1 || p.tsHeap[0] != 30 { + t.Errorf("expected heap min=30 after removing 10 and 20, got len=%d", p.tsHeap.Len()) + } +} + +// benchResult prevents the compiler from optimizing away the conflict check. +var benchResult bool + +// BenchmarkConflictCheck measures conflict check cost with varying active job counts. +// With the index-based approach, cost should be O(depth) regardless of job count. +func BenchmarkConflictCheck(b *testing.B) { + for _, numJobs := range []int{32, 256, 1024} { + b.Run(fmt.Sprintf("jobs=%d", numJobs), func(b *testing.B) { + noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil } + p := NewMetadataProcessor(noop, numJobs+1, 0) + + // Fill with active jobs in different directories + for i := range numJobs { + dir := fmt.Sprintf("/dir%d/sub%d", i/100, i%100) + name := fmt.Sprintf("file%d.txt", i) + resp := makeResp(dir, name, false, int64(i+1), true) + path, newPath, isDir := extractPathsFromMetadata(resp) + p.activeJobs[resp.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir} + p.addPathToIndex(path, isDir) + } + + // Benchmark conflict check for a non-conflicting event + probe := makeResp("/other/path", "test.txt", false, int64(numJobs+1), true) + var r bool + b.ResetTimer() + for i := 0; i < b.N; i++ { + r = p.conflictsWith(probe) + } + benchResult = r + }) + } +}