148 lines
3.8 KiB

package command
import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"sync"
"sync/atomic"
)
type MetadataProcessor struct {
activeJobs map[int64]*filer_pb.SubscribeMetadataResponse
activeJobsLock sync.Mutex
activeJobsCond *sync.Cond
concurrencyLimit int
fn pb.ProcessMetadataFunc
processedTsWatermark atomic.Int64
}
func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int, offsetTsNs int64) *MetadataProcessor {
t := &MetadataProcessor{
fn: fn,
activeJobs: make(map[int64]*filer_pb.SubscribeMetadataResponse),
concurrencyLimit: concurrency,
}
t.processedTsWatermark.Store(offsetTsNs)
t.activeJobsCond = sync.NewCond(&t.activeJobsLock)
return t
}
func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) {
if filer_pb.IsEmpty(resp) {
return
}
t.activeJobsLock.Lock()
defer t.activeJobsLock.Unlock()
for len(t.activeJobs) >= t.concurrencyLimit || t.conflictsWith(resp) {
t.activeJobsCond.Wait()
}
t.activeJobs[resp.TsNs] = resp
go func() {
if err := util.Retry("metadata processor", func() error {
return t.fn(resp)
}); err != nil {
glog.Errorf("process %v: %v", resp, err)
}
t.activeJobsLock.Lock()
defer t.activeJobsLock.Unlock()
delete(t.activeJobs, resp.TsNs)
// if is the oldest job, write down the watermark
isOldest := true
for t := range t.activeJobs {
if resp.TsNs > t {
isOldest = false
break
}
}
if isOldest {
t.processedTsWatermark.Store(resp.TsNs)
}
t.activeJobsCond.Signal()
}()
}
func (t *MetadataProcessor) conflictsWith(resp *filer_pb.SubscribeMetadataResponse) bool {
for _, r := range t.activeJobs {
if shouldWaitFor(resp, r) {
return true
}
}
return false
}
// a is one possible job to schedule
// b is one existing active job
func shouldWaitFor(a *filer_pb.SubscribeMetadataResponse, b *filer_pb.SubscribeMetadataResponse) bool {
aPath, aNewPath, aIsDirectory := extractPathsFromMetadata(a)
bPath, bNewPath, bIsDirectory := extractPathsFromMetadata(b)
if pairShouldWaitFor(aPath, bPath, aIsDirectory, bIsDirectory) {
return true
}
if aNewPath != "" {
if pairShouldWaitFor(aNewPath, bPath, aIsDirectory, bIsDirectory) {
return true
}
}
if bNewPath != "" {
if pairShouldWaitFor(aPath, bNewPath, aIsDirectory, bIsDirectory) {
return true
}
}
if aNewPath != "" && bNewPath != "" {
if pairShouldWaitFor(aNewPath, bNewPath, aIsDirectory, bIsDirectory) {
return true
}
}
return false
}
func pairShouldWaitFor(aPath, bPath util.FullPath, aIsDirectory, bIsDirectory bool) bool {
if bIsDirectory {
if aIsDirectory {
return aPath.IsUnder(bPath) || bPath.IsUnder(aPath)
} else {
return aPath.IsUnder(bPath)
}
} else {
if aIsDirectory {
return bPath.IsUnder(aPath)
} else {
return aPath == bPath
}
}
}
func extractPathsFromMetadata(resp *filer_pb.SubscribeMetadataResponse) (path, newPath util.FullPath, isDirectory bool) {
oldEntry := resp.EventNotification.OldEntry
newEntry := resp.EventNotification.NewEntry
// create
if filer_pb.IsCreate(resp) {
path = util.FullPath(resp.Directory).Child(newEntry.Name)
isDirectory = newEntry.IsDirectory
return
}
if filer_pb.IsDelete(resp) {
path = util.FullPath(resp.Directory).Child(oldEntry.Name)
isDirectory = oldEntry.IsDirectory
return
}
if filer_pb.IsUpdate(resp) {
path = util.FullPath(resp.Directory).Child(newEntry.Name)
isDirectory = newEntry.IsDirectory
return
}
// renaming
path = util.FullPath(resp.Directory).Child(oldEntry.Name)
isDirectory = oldEntry.IsDirectory
newPath = util.FullPath(resp.EventNotification.NewParentPath).Child(newEntry.Name)
return
}