You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

243 lines
6.7 KiB

package command
import (
"container/heap"
"path"
"sync"
"sync/atomic"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// tsMinHeap implements heap.Interface for int64 timestamps.
type tsMinHeap []int64
func (h tsMinHeap) Len() int { return len(h) }
func (h tsMinHeap) Less(i, j int) bool { return h[i] < h[j] }
func (h tsMinHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *tsMinHeap) Push(x any) { *h = append(*h, x.(int64)) }
func (h *tsMinHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[:n-1]
return x
}
type syncJobPaths struct {
path util.FullPath
newPath util.FullPath // empty for non-renames
isDirectory bool
}
type MetadataProcessor struct {
activeJobs map[int64]*syncJobPaths
activeJobsLock sync.Mutex
activeJobsCond *sync.Cond
concurrencyLimit int
fn pb.ProcessMetadataFunc
processedTsWatermark atomic.Int64
// Indexes for O(depth) conflict detection, replacing O(n) linear scan.
// activeFilePaths counts active file jobs at each exact path.
activeFilePaths map[util.FullPath]int
// activeDirPaths counts active directory jobs at each exact path.
activeDirPaths map[util.FullPath]int
// descendantCount counts active jobs (file or dir) strictly under each directory.
descendantCount map[util.FullPath]int
// tsHeap is a min-heap of active job timestamps with lazy deletion,
// used for O(log n) amortized watermark tracking.
tsHeap tsMinHeap
}
func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int, offsetTsNs int64) *MetadataProcessor {
t := &MetadataProcessor{
fn: fn,
activeJobs: make(map[int64]*syncJobPaths),
concurrencyLimit: concurrency,
activeFilePaths: make(map[util.FullPath]int),
activeDirPaths: make(map[util.FullPath]int),
descendantCount: make(map[util.FullPath]int),
}
t.processedTsWatermark.Store(offsetTsNs)
t.activeJobsCond = sync.NewCond(&t.activeJobsLock)
return t
}
// pathAncestors returns all proper ancestor directories of p.
// For "/a/b/c", returns ["/a/b", "/a", "/"].
func pathAncestors(p util.FullPath) []util.FullPath {
var ancestors []util.FullPath
s := string(p)
for {
parent := path.Dir(s)
if parent == s {
break
}
ancestors = append(ancestors, util.FullPath(parent))
s = parent
}
return ancestors
}
// addPathToIndex registers a path in the conflict detection indexes.
// Must be called under activeJobsLock.
func (t *MetadataProcessor) addPathToIndex(p util.FullPath, isDirectory bool) {
if isDirectory {
t.activeDirPaths[p]++
} else {
t.activeFilePaths[p]++
}
for _, ancestor := range pathAncestors(p) {
t.descendantCount[ancestor]++
}
}
// removePathFromIndex unregisters a path from the conflict detection indexes.
// Must be called under activeJobsLock.
func (t *MetadataProcessor) removePathFromIndex(p util.FullPath, isDirectory bool) {
if isDirectory {
if t.activeDirPaths[p] <= 1 {
delete(t.activeDirPaths, p)
} else {
t.activeDirPaths[p]--
}
} else {
if t.activeFilePaths[p] <= 1 {
delete(t.activeFilePaths, p)
} else {
t.activeFilePaths[p]--
}
}
for _, ancestor := range pathAncestors(p) {
if t.descendantCount[ancestor] <= 1 {
delete(t.descendantCount, ancestor)
} else {
t.descendantCount[ancestor]--
}
}
}
// pathConflicts checks if a single path conflicts with any active job.
// Conflict rules match pairShouldWaitFor:
// - file vs file: exact same path
// - file vs dir: file.IsUnder(dir)
// - dir vs file: file.IsUnder(dir)
// - dir vs dir: either IsUnder the other
func (t *MetadataProcessor) pathConflicts(p util.FullPath, isDirectory bool) bool {
if isDirectory {
// Any active job (file or dir) strictly under this directory?
if t.descendantCount[p] > 0 {
return true
}
} else {
// Exact same file already active?
if t.activeFilePaths[p] > 0 {
return true
}
}
// Any active directory that is a proper ancestor of p?
for _, ancestor := range pathAncestors(p) {
if t.activeDirPaths[ancestor] > 0 {
return true
}
}
return false
}
func (t *MetadataProcessor) conflictsWith(resp *filer_pb.SubscribeMetadataResponse) bool {
p, newPath, isDirectory := extractPathsFromMetadata(resp)
if t.pathConflicts(p, isDirectory) {
return true
}
if newPath != "" && t.pathConflicts(newPath, isDirectory) {
return true
}
return false
}
func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) {
if filer_pb.IsEmpty(resp) {
return
}
t.activeJobsLock.Lock()
defer t.activeJobsLock.Unlock()
for len(t.activeJobs) >= t.concurrencyLimit || t.conflictsWith(resp) {
t.activeJobsCond.Wait()
}
p, newPath, isDirectory := extractPathsFromMetadata(resp)
jobPaths := &syncJobPaths{path: p, newPath: newPath, isDirectory: isDirectory}
t.activeJobs[resp.TsNs] = jobPaths
t.addPathToIndex(p, isDirectory)
if newPath != "" {
t.addPathToIndex(newPath, isDirectory)
}
heap.Push(&t.tsHeap, resp.TsNs)
go func() {
if err := util.Retry("metadata processor", func() error {
return t.fn(resp)
}); err != nil {
glog.Errorf("process %v: %v", resp, err)
}
t.activeJobsLock.Lock()
defer t.activeJobsLock.Unlock()
delete(t.activeJobs, resp.TsNs)
t.removePathFromIndex(jobPaths.path, jobPaths.isDirectory)
if jobPaths.newPath != "" {
t.removePathFromIndex(jobPaths.newPath, jobPaths.isDirectory)
}
// Lazy-clean stale entries from heap top (already-completed jobs).
// Each entry is pushed once and popped once: O(log n) amortized.
for t.tsHeap.Len() > 0 {
if _, active := t.activeJobs[t.tsHeap[0]]; active {
break
}
heap.Pop(&t.tsHeap)
}
// If this was the oldest job, advance the watermark.
if t.tsHeap.Len() == 0 || resp.TsNs < t.tsHeap[0] {
t.processedTsWatermark.Store(resp.TsNs)
}
t.activeJobsCond.Signal()
}()
}
func extractPathsFromMetadata(resp *filer_pb.SubscribeMetadataResponse) (p, newPath util.FullPath, isDirectory bool) {
oldEntry := resp.EventNotification.OldEntry
newEntry := resp.EventNotification.NewEntry
// create
if filer_pb.IsCreate(resp) {
p = util.FullPath(resp.Directory).Child(newEntry.Name)
isDirectory = newEntry.IsDirectory
return
}
if filer_pb.IsDelete(resp) {
p = util.FullPath(resp.Directory).Child(oldEntry.Name)
isDirectory = oldEntry.IsDirectory
return
}
if filer_pb.IsUpdate(resp) {
p = util.FullPath(resp.Directory).Child(newEntry.Name)
isDirectory = newEntry.IsDirectory
return
}
// renaming
p = util.FullPath(resp.Directory).Child(oldEntry.Name)
isDirectory = oldEntry.IsDirectory
newPath = util.FullPath(resp.EventNotification.NewParentPath).Child(newEntry.Name)
return
}