Browse Source

filer.sync: replace O(n) conflict check with O(depth) index lookups (#8772)

* filer.sync: replace O(n) conflict check with O(depth) index lookups

The MetadataProcessor.conflictsWith() scanned all active jobs linearly
for every new event dispatch. At high concurrency (256-1024), this O(n)
scan under the activeJobsLock became a bottleneck that throttled the
event dispatch pipeline, negating the benefit of higher -concurrency
values.

Replace the linear scan with three index maps:
- activeFilePaths: O(1) exact file path lookup
- activeDirPaths: O(1) directory path lookup per ancestor
- descendantCount: O(1) check for active jobs under a directory

Conflict check is now O(depth) where depth is the path depth (typically
3-6 levels), constant regardless of active job count. Benchmark confirms
~81ns per check whether there are 32 or 1024 active jobs.

Also replace the O(n) watermark scan with minActiveTs tracking so
non-oldest job completions are O(1).

Ref: #8771

* filer.sync: replace O(n) watermark rescan with min-heap lazy deletion

Address review feedback:
- Replace minActiveTs O(n) rescan with a tsMinHeap using lazy deletion.
  Each TsNs is pushed once and popped once, giving O(log n) amortized
  watermark tracking regardless of completion order.
- Fix benchmark to consume conflictsWith result via package-level sink
  variable to prevent compiler elision.

The watermark advancement semantics (conservative, sets to completing
job's TsNs) are unchanged from the original code. This is intentionally
safe for idempotent replay on restart.
pull/8729/merge
Chris Lu 23 hours ago
committed by GitHub
parent
commit
29bdbb3c48
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 234
      weed/command/filer_sync_jobs.go
  2. 387
      weed/command/filer_sync_jobs_test.go

234
weed/command/filer_sync_jobs.go

@ -1,6 +1,8 @@
package command
import (
"container/heap"
"path"
"sync"
"sync/atomic"
@ -10,26 +12,154 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
)
// tsMinHeap implements heap.Interface for int64 timestamps.
type tsMinHeap []int64
func (h tsMinHeap) Len() int { return len(h) }
func (h tsMinHeap) Less(i, j int) bool { return h[i] < h[j] }
func (h tsMinHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *tsMinHeap) Push(x any) { *h = append(*h, x.(int64)) }
func (h *tsMinHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[:n-1]
return x
}
type syncJobPaths struct {
path util.FullPath
newPath util.FullPath // empty for non-renames
isDirectory bool
}
type MetadataProcessor struct {
activeJobs map[int64]*filer_pb.SubscribeMetadataResponse
activeJobsLock sync.Mutex
activeJobsCond *sync.Cond
concurrencyLimit int
fn pb.ProcessMetadataFunc
activeJobs map[int64]*syncJobPaths
activeJobsLock sync.Mutex
activeJobsCond *sync.Cond
concurrencyLimit int
fn pb.ProcessMetadataFunc
processedTsWatermark atomic.Int64
// Indexes for O(depth) conflict detection, replacing O(n) linear scan.
// activeFilePaths counts active file jobs at each exact path.
activeFilePaths map[util.FullPath]int
// activeDirPaths counts active directory jobs at each exact path.
activeDirPaths map[util.FullPath]int
// descendantCount counts active jobs (file or dir) strictly under each directory.
descendantCount map[util.FullPath]int
// tsHeap is a min-heap of active job timestamps with lazy deletion,
// used for O(log n) amortized watermark tracking.
tsHeap tsMinHeap
}
func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int, offsetTsNs int64) *MetadataProcessor {
t := &MetadataProcessor{
fn: fn,
activeJobs: make(map[int64]*filer_pb.SubscribeMetadataResponse),
activeJobs: make(map[int64]*syncJobPaths),
concurrencyLimit: concurrency,
activeFilePaths: make(map[util.FullPath]int),
activeDirPaths: make(map[util.FullPath]int),
descendantCount: make(map[util.FullPath]int),
}
t.processedTsWatermark.Store(offsetTsNs)
t.activeJobsCond = sync.NewCond(&t.activeJobsLock)
return t
}
// pathAncestors returns all proper ancestor directories of p.
// For "/a/b/c", returns ["/a/b", "/a", "/"].
func pathAncestors(p util.FullPath) []util.FullPath {
var ancestors []util.FullPath
s := string(p)
for {
parent := path.Dir(s)
if parent == s {
break
}
ancestors = append(ancestors, util.FullPath(parent))
s = parent
}
return ancestors
}
// addPathToIndex registers a path in the conflict detection indexes.
// Must be called under activeJobsLock.
func (t *MetadataProcessor) addPathToIndex(p util.FullPath, isDirectory bool) {
if isDirectory {
t.activeDirPaths[p]++
} else {
t.activeFilePaths[p]++
}
for _, ancestor := range pathAncestors(p) {
t.descendantCount[ancestor]++
}
}
// removePathFromIndex unregisters a path from the conflict detection indexes.
// Must be called under activeJobsLock.
func (t *MetadataProcessor) removePathFromIndex(p util.FullPath, isDirectory bool) {
if isDirectory {
if t.activeDirPaths[p] <= 1 {
delete(t.activeDirPaths, p)
} else {
t.activeDirPaths[p]--
}
} else {
if t.activeFilePaths[p] <= 1 {
delete(t.activeFilePaths, p)
} else {
t.activeFilePaths[p]--
}
}
for _, ancestor := range pathAncestors(p) {
if t.descendantCount[ancestor] <= 1 {
delete(t.descendantCount, ancestor)
} else {
t.descendantCount[ancestor]--
}
}
}
// pathConflicts checks if a single path conflicts with any active job.
// Conflict rules match pairShouldWaitFor:
// - file vs file: exact same path
// - file vs dir: file.IsUnder(dir)
// - dir vs file: file.IsUnder(dir)
// - dir vs dir: either IsUnder the other
func (t *MetadataProcessor) pathConflicts(p util.FullPath, isDirectory bool) bool {
if isDirectory {
// Any active job (file or dir) strictly under this directory?
if t.descendantCount[p] > 0 {
return true
}
} else {
// Exact same file already active?
if t.activeFilePaths[p] > 0 {
return true
}
}
// Any active directory that is a proper ancestor of p?
for _, ancestor := range pathAncestors(p) {
if t.activeDirPaths[ancestor] > 0 {
return true
}
}
return false
}
func (t *MetadataProcessor) conflictsWith(resp *filer_pb.SubscribeMetadataResponse) bool {
p, newPath, isDirectory := extractPathsFromMetadata(resp)
if t.pathConflicts(p, isDirectory) {
return true
}
if newPath != "" && t.pathConflicts(newPath, isDirectory) {
return true
}
return false
}
func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) {
if filer_pb.IsEmpty(resp) {
return
@ -41,7 +171,18 @@ func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse)
for len(t.activeJobs) >= t.concurrencyLimit || t.conflictsWith(resp) {
t.activeJobsCond.Wait()
}
t.activeJobs[resp.TsNs] = resp
p, newPath, isDirectory := extractPathsFromMetadata(resp)
jobPaths := &syncJobPaths{path: p, newPath: newPath, isDirectory: isDirectory}
t.activeJobs[resp.TsNs] = jobPaths
t.addPathToIndex(p, isDirectory)
if newPath != "" {
t.addPathToIndex(newPath, isDirectory)
}
heap.Push(&t.tsHeap, resp.TsNs)
go func() {
if err := util.Retry("metadata processor", func() error {
@ -54,95 +195,48 @@ func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse)
defer t.activeJobsLock.Unlock()
delete(t.activeJobs, resp.TsNs)
t.removePathFromIndex(jobPaths.path, jobPaths.isDirectory)
if jobPaths.newPath != "" {
t.removePathFromIndex(jobPaths.newPath, jobPaths.isDirectory)
}
// if is the oldest job, write down the watermark
isOldest := true
for t := range t.activeJobs {
if resp.TsNs > t {
isOldest = false
// Lazy-clean stale entries from heap top (already-completed jobs).
// Each entry is pushed once and popped once: O(log n) amortized.
for t.tsHeap.Len() > 0 {
if _, active := t.activeJobs[t.tsHeap[0]]; active {
break
}
heap.Pop(&t.tsHeap)
}
if isOldest {
// If this was the oldest job, advance the watermark.
if t.tsHeap.Len() == 0 || resp.TsNs < t.tsHeap[0] {
t.processedTsWatermark.Store(resp.TsNs)
}
t.activeJobsCond.Signal()
}()
}
func (t *MetadataProcessor) conflictsWith(resp *filer_pb.SubscribeMetadataResponse) bool {
for _, r := range t.activeJobs {
if shouldWaitFor(resp, r) {
return true
}
}
return false
}
// a is one possible job to schedule
// b is one existing active job
func shouldWaitFor(a *filer_pb.SubscribeMetadataResponse, b *filer_pb.SubscribeMetadataResponse) bool {
aPath, aNewPath, aIsDirectory := extractPathsFromMetadata(a)
bPath, bNewPath, bIsDirectory := extractPathsFromMetadata(b)
if pairShouldWaitFor(aPath, bPath, aIsDirectory, bIsDirectory) {
return true
}
if aNewPath != "" {
if pairShouldWaitFor(aNewPath, bPath, aIsDirectory, bIsDirectory) {
return true
}
}
if bNewPath != "" {
if pairShouldWaitFor(aPath, bNewPath, aIsDirectory, bIsDirectory) {
return true
}
}
if aNewPath != "" && bNewPath != "" {
if pairShouldWaitFor(aNewPath, bNewPath, aIsDirectory, bIsDirectory) {
return true
}
}
return false
}
func pairShouldWaitFor(aPath, bPath util.FullPath, aIsDirectory, bIsDirectory bool) bool {
if bIsDirectory {
if aIsDirectory {
return aPath.IsUnder(bPath) || bPath.IsUnder(aPath)
} else {
return aPath.IsUnder(bPath)
}
} else {
if aIsDirectory {
return bPath.IsUnder(aPath)
} else {
return aPath == bPath
}
}
}
func extractPathsFromMetadata(resp *filer_pb.SubscribeMetadataResponse) (path, newPath util.FullPath, isDirectory bool) {
func extractPathsFromMetadata(resp *filer_pb.SubscribeMetadataResponse) (p, newPath util.FullPath, isDirectory bool) {
oldEntry := resp.EventNotification.OldEntry
newEntry := resp.EventNotification.NewEntry
// create
if filer_pb.IsCreate(resp) {
path = util.FullPath(resp.Directory).Child(newEntry.Name)
p = util.FullPath(resp.Directory).Child(newEntry.Name)
isDirectory = newEntry.IsDirectory
return
}
if filer_pb.IsDelete(resp) {
path = util.FullPath(resp.Directory).Child(oldEntry.Name)
p = util.FullPath(resp.Directory).Child(oldEntry.Name)
isDirectory = oldEntry.IsDirectory
return
}
if filer_pb.IsUpdate(resp) {
path = util.FullPath(resp.Directory).Child(newEntry.Name)
p = util.FullPath(resp.Directory).Child(newEntry.Name)
isDirectory = newEntry.IsDirectory
return
}
// renaming
path = util.FullPath(resp.Directory).Child(oldEntry.Name)
p = util.FullPath(resp.Directory).Child(oldEntry.Name)
isDirectory = oldEntry.IsDirectory
newPath = util.FullPath(resp.EventNotification.NewParentPath).Child(newEntry.Name)
return

387
weed/command/filer_sync_jobs_test.go

@ -0,0 +1,387 @@
package command
import (
"container/heap"
"fmt"
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func makeResp(dir, name string, isDir bool, tsNs int64, isNew bool) *filer_pb.SubscribeMetadataResponse {
resp := &filer_pb.SubscribeMetadataResponse{
Directory: dir,
TsNs: tsNs,
EventNotification: &filer_pb.EventNotification{},
}
entry := &filer_pb.Entry{
Name: name,
IsDirectory: isDir,
}
if isNew {
resp.EventNotification.NewEntry = entry
} else {
resp.EventNotification.OldEntry = entry
}
return resp
}
func makeRenameResp(oldDir, oldName, newDir, newName string, isDir bool, tsNs int64) *filer_pb.SubscribeMetadataResponse {
return &filer_pb.SubscribeMetadataResponse{
Directory: oldDir,
TsNs: tsNs,
EventNotification: &filer_pb.EventNotification{
OldEntry: &filer_pb.Entry{
Name: oldName,
IsDirectory: isDir,
},
NewEntry: &filer_pb.Entry{
Name: newName,
IsDirectory: isDir,
},
NewParentPath: newDir,
},
}
}
func TestPathAncestors(t *testing.T) {
tests := []struct {
path util.FullPath
expected []util.FullPath
}{
{"/a/b/c/file.txt", []util.FullPath{"/a/b/c", "/a/b", "/a", "/"}},
{"/a/b", []util.FullPath{"/a", "/"}},
{"/a", []util.FullPath{"/"}},
{"/", nil},
}
for _, tt := range tests {
got := pathAncestors(tt.path)
if len(got) != len(tt.expected) {
t.Errorf("pathAncestors(%q) = %v, want %v", tt.path, got, tt.expected)
continue
}
for i := range got {
if got[i] != tt.expected[i] {
t.Errorf("pathAncestors(%q)[%d] = %q, want %q", tt.path, i, got[i], tt.expected[i])
}
}
}
}
// TestFileVsFileConflict verifies that two file operations on the same path conflict,
// and on different paths do not.
func TestFileVsFileConflict(t *testing.T) {
noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil }
p := NewMetadataProcessor(noop, 100, 0)
// Add a file job
active := makeResp("/dir1", "file.txt", false, 1, true)
path, newPath, isDir := extractPathsFromMetadata(active)
p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir}
p.addPathToIndex(path, isDir)
// Same file should conflict
same := makeResp("/dir1", "file.txt", false, 2, true)
if !p.conflictsWith(same) {
t.Error("expected conflict for same file path")
}
// Different file should not conflict
diff := makeResp("/dir1", "other.txt", false, 3, true)
if p.conflictsWith(diff) {
t.Error("unexpected conflict for different file path")
}
// File in different directory should not conflict
diffDir := makeResp("/dir2", "file.txt", false, 4, true)
if p.conflictsWith(diffDir) {
t.Error("unexpected conflict for file in different directory")
}
}
// TestFileUnderActiveDirConflict verifies that a file under an active directory operation
// conflicts, but a file outside does not.
func TestFileUnderActiveDirConflict(t *testing.T) {
noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil }
p := NewMetadataProcessor(noop, 100, 0)
// Add a directory job at /dir1
active := makeResp("/", "dir1", true, 1, true)
path, newPath, isDir := extractPathsFromMetadata(active)
p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir}
p.addPathToIndex(path, isDir)
// File under /dir1 should conflict
under := makeResp("/dir1", "file.txt", false, 2, true)
if !p.conflictsWith(under) {
t.Error("expected conflict for file under active directory")
}
// File deeply nested under /dir1 should conflict
deep := makeResp("/dir1/sub/deep", "file.txt", false, 3, true)
if !p.conflictsWith(deep) {
t.Error("expected conflict for deeply nested file under active directory")
}
// File in /dir2 should not conflict
outside := makeResp("/dir2", "file.txt", false, 4, true)
if p.conflictsWith(outside) {
t.Error("unexpected conflict for file outside active directory")
}
// File at /dir1 itself (not under, at) should not conflict
// because IsUnder is strict: "/dir1".IsUnder("/dir1") == false
atSame := makeResp("/", "dir1", false, 5, true)
if p.conflictsWith(atSame) {
t.Error("unexpected conflict for file at same path as directory (IsUnder is strict)")
}
}
// TestDirWithActiveFileUnder verifies that a directory operation conflicts when
// there are active file jobs under it.
func TestDirWithActiveFileUnder(t *testing.T) {
noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil }
p := NewMetadataProcessor(noop, 100, 0)
// Add file jobs under /dir1
f1 := makeResp("/dir1/sub", "file.txt", false, 1, true)
path, newPath, isDir := extractPathsFromMetadata(f1)
p.activeJobs[f1.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir}
p.addPathToIndex(path, isDir)
// Directory /dir1 should conflict (has active file under it)
dirOp := makeResp("/", "dir1", true, 2, true)
if !p.conflictsWith(dirOp) {
t.Error("expected conflict for directory with active file under it")
}
// Directory /dir2 should not conflict
dirOp2 := makeResp("/", "dir2", true, 3, true)
if p.conflictsWith(dirOp2) {
t.Error("unexpected conflict for directory with no active jobs under it")
}
}
// TestDirVsDirConflict verifies ancestor/descendant directory conflict detection.
func TestDirVsDirConflict(t *testing.T) {
noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil }
p := NewMetadataProcessor(noop, 100, 0)
// Add directory job at /a/b
active := makeResp("/a", "b", true, 1, true)
path, newPath, isDir := extractPathsFromMetadata(active)
p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir}
p.addPathToIndex(path, isDir)
// /a/b/c (descendant) should conflict
desc := makeResp("/a/b", "c", true, 2, true)
if !p.conflictsWith(desc) {
t.Error("expected conflict for descendant directory")
}
// /a (ancestor) should conflict
anc := makeResp("/", "a", true, 3, true)
if !p.conflictsWith(anc) {
t.Error("expected conflict for ancestor directory")
}
// Same directory should NOT conflict (IsUnder is strict, not equal)
same := makeResp("/a", "b", true, 4, true)
if p.conflictsWith(same) {
t.Error("unexpected conflict for same directory (IsUnder is strict)")
}
// Sibling directory should not conflict
sibling := makeResp("/a", "c", true, 5, true)
if p.conflictsWith(sibling) {
t.Error("unexpected conflict for sibling directory")
}
}
// TestRenameConflict verifies that rename events with two paths check both paths.
func TestRenameConflict(t *testing.T) {
noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil }
p := NewMetadataProcessor(noop, 100, 0)
// Add file job at /dir1/file.txt
f1 := makeResp("/dir1", "file.txt", false, 1, true)
path, newPath, isDir := extractPathsFromMetadata(f1)
p.activeJobs[f1.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir}
p.addPathToIndex(path, isDir)
// Rename from /dir2/a.txt to /dir1/file.txt should conflict (newPath matches)
rename := makeRenameResp("/dir2", "a.txt", "/dir1", "file.txt", false, 2)
if !p.conflictsWith(rename) {
t.Error("expected conflict for rename whose destination matches active file")
}
// Rename from /dir1/file.txt to /dir2/b.txt should conflict (oldPath matches)
rename2 := makeRenameResp("/dir1", "file.txt", "/dir2", "b.txt", false, 3)
if !p.conflictsWith(rename2) {
t.Error("expected conflict for rename whose source matches active file")
}
// Rename between unrelated paths should not conflict
rename3 := makeRenameResp("/dir3", "x.txt", "/dir4", "y.txt", false, 4)
if p.conflictsWith(rename3) {
t.Error("unexpected conflict for rename between unrelated paths")
}
}
// TestActiveRenameConflict verifies that an active rename job registers both paths.
func TestActiveRenameConflict(t *testing.T) {
noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil }
p := NewMetadataProcessor(noop, 100, 0)
// Add active rename job: /dir1/old.txt -> /dir2/new.txt
rename := makeRenameResp("/dir1", "old.txt", "/dir2", "new.txt", false, 1)
path, newPath, isDir := extractPathsFromMetadata(rename)
p.activeJobs[rename.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir}
p.addPathToIndex(path, isDir)
if newPath != "" {
p.addPathToIndex(newPath, isDir)
}
// File at /dir1/old.txt should conflict
f1 := makeResp("/dir1", "old.txt", false, 2, true)
if !p.conflictsWith(f1) {
t.Error("expected conflict at rename source path")
}
// File at /dir2/new.txt should conflict
f2 := makeResp("/dir2", "new.txt", false, 3, true)
if !p.conflictsWith(f2) {
t.Error("expected conflict at rename destination path")
}
// File at unrelated path should not conflict
f3 := makeResp("/dir3", "other.txt", false, 4, true)
if p.conflictsWith(f3) {
t.Error("unexpected conflict at unrelated path")
}
}
// TestRootDirConflict verifies that an active job at / conflicts with everything.
func TestRootDirConflict(t *testing.T) {
noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil }
p := NewMetadataProcessor(noop, 100, 0)
// Add directory job at /
// Note: a dir entry at "/" would be created as FullPath("/").Child("somedir")
// But let's test what happens with an active dir at /some/path and check root
active := makeResp("/some", "dir", true, 1, true)
path, newPath, isDir := extractPathsFromMetadata(active)
p.activeJobs[active.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir}
p.addPathToIndex(path, isDir)
// Root dir should conflict because active dir /some/dir is under /
// A new directory at "/" should see descendantCount["/"] > 0
if p.descendantCount["/"] <= 0 {
t.Error("expected descendantCount['/'] > 0 for active job under root")
}
}
// TestIndexCleanup verifies that removing a job properly cleans up all indexes.
func TestIndexCleanup(t *testing.T) {
noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil }
p := NewMetadataProcessor(noop, 100, 0)
// Add then remove a file job
path := util.FullPath("/a/b/c/file.txt")
p.addPathToIndex(path, false)
if p.activeFilePaths[path] != 1 {
t.Errorf("expected activeFilePaths count 1, got %d", p.activeFilePaths[path])
}
if p.descendantCount["/a/b/c"] != 1 {
t.Errorf("expected descendantCount['/a/b/c'] = 1, got %d", p.descendantCount["/a/b/c"])
}
p.removePathFromIndex(path, false)
if len(p.activeFilePaths) != 0 {
t.Errorf("expected empty activeFilePaths after removal, got %v", p.activeFilePaths)
}
if len(p.descendantCount) != 0 {
t.Errorf("expected empty descendantCount after removal, got %v", p.descendantCount)
}
}
// TestWatermarkWithHeap verifies watermark advancement using the min-heap.
func TestWatermarkWithHeap(t *testing.T) {
noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil }
p := NewMetadataProcessor(noop, 100, 0)
// Simulate adding jobs in order
for _, ts := range []int64{10, 20, 30} {
jobPath := util.FullPath("/file" + string(rune('0'+ts/10)))
p.activeJobs[ts] = &syncJobPaths{path: jobPath, isDirectory: false}
p.addPathToIndex(jobPath, false)
heap.Push(&p.tsHeap, ts)
}
if p.tsHeap[0] != 10 {
t.Errorf("expected heap min=10, got %d", p.tsHeap[0])
}
// Remove non-oldest (ts=20) — heap top should stay 10
delete(p.activeJobs, 20)
p.removePathFromIndex("/file2", false)
// Lazy clean: top is 10 which is still active, so no pop
for p.tsHeap.Len() > 0 {
if _, active := p.activeJobs[p.tsHeap[0]]; active {
break
}
heap.Pop(&p.tsHeap)
}
if p.tsHeap[0] != 10 {
t.Errorf("expected heap min=10 after removing 20, got %d", p.tsHeap[0])
}
// Remove oldest (ts=10) — lazy clean should find 30
delete(p.activeJobs, 10)
p.removePathFromIndex("/file1", false)
for p.tsHeap.Len() > 0 {
if _, active := p.activeJobs[p.tsHeap[0]]; active {
break
}
heap.Pop(&p.tsHeap)
}
if p.tsHeap.Len() != 1 || p.tsHeap[0] != 30 {
t.Errorf("expected heap min=30 after removing 10 and 20, got len=%d", p.tsHeap.Len())
}
}
// benchResult prevents the compiler from optimizing away the conflict check.
var benchResult bool
// BenchmarkConflictCheck measures conflict check cost with varying active job counts.
// With the index-based approach, cost should be O(depth) regardless of job count.
func BenchmarkConflictCheck(b *testing.B) {
for _, numJobs := range []int{32, 256, 1024} {
b.Run(fmt.Sprintf("jobs=%d", numJobs), func(b *testing.B) {
noop := func(resp *filer_pb.SubscribeMetadataResponse) error { return nil }
p := NewMetadataProcessor(noop, numJobs+1, 0)
// Fill with active jobs in different directories
for i := range numJobs {
dir := fmt.Sprintf("/dir%d/sub%d", i/100, i%100)
name := fmt.Sprintf("file%d.txt", i)
resp := makeResp(dir, name, false, int64(i+1), true)
path, newPath, isDir := extractPathsFromMetadata(resp)
p.activeJobs[resp.TsNs] = &syncJobPaths{path: path, newPath: newPath, isDirectory: isDir}
p.addPathToIndex(path, isDir)
}
// Benchmark conflict check for a non-conflicting event
probe := makeResp("/other/path", "test.txt", false, int64(numJobs+1), true)
var r bool
b.ResetTimer()
for i := 0; i < b.N; i++ {
r = p.conflictsWith(probe)
}
benchResult = r
})
}
}
Loading…
Cancel
Save