You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							148 lines
						
					
					
						
							3.8 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							148 lines
						
					
					
						
							3.8 KiB
						
					
					
				| package command | |
| 
 | |
| import ( | |
| 	"github.com/seaweedfs/seaweedfs/weed/glog" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/util" | |
| 	"sync" | |
| 	"sync/atomic" | |
| ) | |
| 
 | |
| type MetadataProcessor struct { | |
| 	activeJobs           map[int64]*filer_pb.SubscribeMetadataResponse | |
| 	activeJobsLock       sync.Mutex | |
| 	activeJobsCond       *sync.Cond | |
| 	concurrencyLimit     int | |
| 	fn                   pb.ProcessMetadataFunc | |
| 	processedTsWatermark atomic.Int64 | |
| } | |
| 
 | |
| func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int, offsetTsNs int64) *MetadataProcessor { | |
| 	t := &MetadataProcessor{ | |
| 		fn:               fn, | |
| 		activeJobs:       make(map[int64]*filer_pb.SubscribeMetadataResponse), | |
| 		concurrencyLimit: concurrency, | |
| 	} | |
| 	t.processedTsWatermark.Store(offsetTsNs) | |
| 	t.activeJobsCond = sync.NewCond(&t.activeJobsLock) | |
| 	return t | |
| } | |
| 
 | |
| func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) { | |
| 	if filer_pb.IsEmpty(resp) { | |
| 		return | |
| 	} | |
| 
 | |
| 	t.activeJobsLock.Lock() | |
| 	defer t.activeJobsLock.Unlock() | |
| 
 | |
| 	for len(t.activeJobs) >= t.concurrencyLimit || t.conflictsWith(resp) { | |
| 		t.activeJobsCond.Wait() | |
| 	} | |
| 	t.activeJobs[resp.TsNs] = resp | |
| 	go func() { | |
| 
 | |
| 		if err := util.Retry("metadata processor", func() error { | |
| 			return t.fn(resp) | |
| 		}); err != nil { | |
| 			glog.Errorf("process %v: %v", resp, err) | |
| 		} | |
| 
 | |
| 		t.activeJobsLock.Lock() | |
| 		defer t.activeJobsLock.Unlock() | |
| 
 | |
| 		delete(t.activeJobs, resp.TsNs) | |
| 
 | |
| 		// if is the oldest job, write down the watermark | |
| 		isOldest := true | |
| 		for t := range t.activeJobs { | |
| 			if resp.TsNs > t { | |
| 				isOldest = false | |
| 				break | |
| 			} | |
| 		} | |
| 		if isOldest { | |
| 			t.processedTsWatermark.Store(resp.TsNs) | |
| 		} | |
| 		t.activeJobsCond.Signal() | |
| 	}() | |
| } | |
| 
 | |
| func (t *MetadataProcessor) conflictsWith(resp *filer_pb.SubscribeMetadataResponse) bool { | |
| 	for _, r := range t.activeJobs { | |
| 		if shouldWaitFor(resp, r) { | |
| 			return true | |
| 		} | |
| 	} | |
| 	return false | |
| } | |
| 
 | |
| // a is one possible job to schedule | |
| // b is one existing active job | |
| func shouldWaitFor(a *filer_pb.SubscribeMetadataResponse, b *filer_pb.SubscribeMetadataResponse) bool { | |
| 	aPath, aNewPath, aIsDirectory := extractPathsFromMetadata(a) | |
| 	bPath, bNewPath, bIsDirectory := extractPathsFromMetadata(b) | |
| 
 | |
| 	if pairShouldWaitFor(aPath, bPath, aIsDirectory, bIsDirectory) { | |
| 		return true | |
| 	} | |
| 	if aNewPath != "" { | |
| 		if pairShouldWaitFor(aNewPath, bPath, aIsDirectory, bIsDirectory) { | |
| 			return true | |
| 		} | |
| 	} | |
| 	if bNewPath != "" { | |
| 		if pairShouldWaitFor(aPath, bNewPath, aIsDirectory, bIsDirectory) { | |
| 			return true | |
| 		} | |
| 	} | |
| 	if aNewPath != "" && bNewPath != "" { | |
| 		if pairShouldWaitFor(aNewPath, bNewPath, aIsDirectory, bIsDirectory) { | |
| 			return true | |
| 		} | |
| 	} | |
| 	return false | |
| } | |
| 
 | |
| func pairShouldWaitFor(aPath, bPath util.FullPath, aIsDirectory, bIsDirectory bool) bool { | |
| 	if bIsDirectory { | |
| 		if aIsDirectory { | |
| 			return aPath.IsUnder(bPath) || bPath.IsUnder(aPath) | |
| 		} else { | |
| 			return aPath.IsUnder(bPath) | |
| 		} | |
| 	} else { | |
| 		if aIsDirectory { | |
| 			return bPath.IsUnder(aPath) | |
| 		} else { | |
| 			return aPath == bPath | |
| 		} | |
| 	} | |
| } | |
| 
 | |
| func extractPathsFromMetadata(resp *filer_pb.SubscribeMetadataResponse) (path, newPath util.FullPath, isDirectory bool) { | |
| 	oldEntry := resp.EventNotification.OldEntry | |
| 	newEntry := resp.EventNotification.NewEntry | |
| 	// create | |
| 	if filer_pb.IsCreate(resp) { | |
| 		path = util.FullPath(resp.Directory).Child(newEntry.Name) | |
| 		isDirectory = newEntry.IsDirectory | |
| 		return | |
| 	} | |
| 	if filer_pb.IsDelete(resp) { | |
| 		path = util.FullPath(resp.Directory).Child(oldEntry.Name) | |
| 		isDirectory = oldEntry.IsDirectory | |
| 		return | |
| 	} | |
| 	if filer_pb.IsUpdate(resp) { | |
| 		path = util.FullPath(resp.Directory).Child(newEntry.Name) | |
| 		isDirectory = newEntry.IsDirectory | |
| 		return | |
| 	} | |
| 	// renaming | |
| 	path = util.FullPath(resp.Directory).Child(oldEntry.Name) | |
| 	isDirectory = oldEntry.IsDirectory | |
| 	newPath = util.FullPath(resp.EventNotification.NewParentPath).Child(newEntry.Name) | |
| 	return | |
| }
 |