diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go index 97147766e..5d138e494 100644 --- a/weed/command/filer_remote_gateway_buckets.go +++ b/weed/command/filer_remote_gateway_buckets.go @@ -164,7 +164,7 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification - if message.NewEntry != nil { + if metadataEventUpdatesDirectory(resp, filer.DirectoryEtcRemote) { // update if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE { newMappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content) @@ -180,8 +180,11 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour } option.remoteConfs[conf.Name] = conf } - } else if message.OldEntry != nil { + } else if metadataEventRemovesFromDirectory(resp, filer.DirectoryEtcRemote) { // deletion + if message.OldEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE { + option.mappings = &remote_pb.RemoteStorageMapping{} + } if strings.HasSuffix(message.OldEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) { conf := &remote_pb.RemoteConf{} if err := proto.Unmarshal(message.OldEntry.Content, conf); err != nil { @@ -196,7 +199,8 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification - if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) { + sourceInEtcRemote, targetInEtcRemote := metadataEventDirectoryMembership(resp, filer.DirectoryEtcRemote) + if sourceInEtcRemote || targetInEtcRemote { return handleEtcRemoteChanges(resp) } diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go index 60f1f8f5b..cc03389df 100644 --- a/weed/command/filer_remote_sync_dir.go +++ b/weed/command/filer_remote_sync_dir.go @@ -92,34 +92,38 @@ func (option *RemoteSyncOptions) makeEventProcessor(remoteStorage *remote_pb.Rem handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification - if message.NewEntry == nil { - return nil - } - if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE { - mappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content) - if readErr != nil { - return fmt.Errorf("unmarshal mappings: %w", readErr) + if metadataEventUpdatesDirectory(resp, filer.DirectoryEtcRemote) { + if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE { + mappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content) + if readErr != nil { + return fmt.Errorf("unmarshal mappings: %w", readErr) + } + if remoteLoc, found := mappings.Mappings[mountedDir]; found { + if remoteStorageMountLocation.Bucket != remoteLoc.Bucket || remoteStorageMountLocation.Path != remoteLoc.Path { + glog.Fatalf("Unexpected mount changes %+v => %+v", remoteStorageMountLocation, remoteLoc) + } + } else { + glog.V(0).Infof("unmounted %s exiting ...", mountedDir) + os.Exit(0) + } } - if remoteLoc, found := mappings.Mappings[mountedDir]; found { - if remoteStorageMountLocation.Bucket != remoteLoc.Bucket || remoteStorageMountLocation.Path != remoteLoc.Path { - glog.Fatalf("Unexpected mount changes %+v => %+v", remoteStorageMountLocation, remoteLoc) + if message.NewEntry.Name == remoteStorage.Name+filer.REMOTE_STORAGE_CONF_SUFFIX { + conf := &remote_pb.RemoteConf{} + if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil { + return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err) + } + remoteStorage = conf + if newClient, err := remote_storage.GetRemoteStorage(remoteStorage); err == nil { + client = newClient + } else { + return err } - } else { - glog.V(0).Infof("unmounted %s exiting ...", mountedDir) - os.Exit(0) } } - if message.NewEntry.Name == remoteStorage.Name+filer.REMOTE_STORAGE_CONF_SUFFIX { - conf := &remote_pb.RemoteConf{} - if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil { - return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err) - } - remoteStorage = conf - if newClient, err := remote_storage.GetRemoteStorage(remoteStorage); err == nil { - client = newClient - } else { - return err - } + if metadataEventRemovesFromDirectory(resp, filer.DirectoryEtcRemote) && + message.OldEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE { + glog.V(0).Infof("unmounted %s exiting ...", mountedDir) + os.Exit(0) } return nil @@ -127,7 +131,8 @@ func (option *RemoteSyncOptions) makeEventProcessor(remoteStorage *remote_pb.Rem eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification - if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) { + sourceInEtcRemote, targetInEtcRemote := metadataEventDirectoryMembership(resp, filer.DirectoryEtcRemote) + if sourceInEtcRemote || targetInEtcRemote { return handleEtcRemoteChanges(resp) } diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 9bdcd6022..2e3824d72 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -26,37 +26,37 @@ import ( ) type SyncOptions struct { - isActivePassive *bool - filerA *string - filerB *string - aPath *string - aExcludePaths *string - bPath *string - bExcludePaths *string - aReplication *string - bReplication *string - aCollection *string - bCollection *string - aTtlSec *int - bTtlSec *int - aDiskType *string - bDiskType *string - aDebug *bool - bDebug *bool - aFromTsMs *int64 - bFromTsMs *int64 - aProxyByFiler *bool - bProxyByFiler *bool - metricsHttpIp *string - metricsHttpPort *int + isActivePassive *bool + filerA *string + filerB *string + aPath *string + aExcludePaths *string + bPath *string + bExcludePaths *string + aReplication *string + bReplication *string + aCollection *string + bCollection *string + aTtlSec *int + bTtlSec *int + aDiskType *string + bDiskType *string + aDebug *bool + bDebug *bool + aFromTsMs *int64 + bFromTsMs *int64 + aProxyByFiler *bool + bProxyByFiler *bool + metricsHttpIp *string + metricsHttpPort *int concurrency *int chunkConcurrency *int aDoDeleteFiles *bool - bDoDeleteFiles *bool - clientId int32 - clientEpoch atomic.Int32 - debug *bool - debugPort *int + bDoDeleteFiles *bool + clientId int32 + clientEpoch atomic.Int32 + debug *bool + debugPort *int } const ( @@ -445,12 +445,17 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification + // Derive the target (new-side) directory once. MetadataEventTargetDirectory + // returns NewParentPath when set, falling back to resp.Directory for + // delete events or legacy events with an empty NewParentPath. + targetDir := filer_pb.MetadataEventTargetDirectory(resp) + var sourceOldKey, sourceNewKey util.FullPath if message.OldEntry != nil { sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name) } if message.NewEntry != nil { - sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name) + sourceNewKey = util.FullPath(targetDir).Child(message.NewEntry.Name) } if debug { @@ -461,19 +466,24 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str return nil } - if !strings.HasPrefix(resp.Directory+"/", sourcePath) { + // For rename events the key/directory is the old (source) path. + // Check both old and new directories so cross-boundary renames + // are not silently dropped. The downstream old/new key handling + // (lines below) already converts these to create or delete. + oldDirExcluded := matchesExcludePath(resp.Directory, excludePaths) + newDirExcluded := matchesExcludePath(targetDir, excludePaths) + oldDirInScope := util.IsEqualOrUnder(resp.Directory, sourcePath) && !oldDirExcluded + newDirInScope := message.NewEntry != nil && + util.IsEqualOrUnder(targetDir, sourcePath) && + !newDirExcluded + if !oldDirInScope && !newDirInScope { return nil } - for _, excludePath := range excludePaths { - if strings.HasPrefix(resp.Directory+"/", excludePath) { - return nil - } - } // Compute per-side exclusion so that rename events crossing an // exclude boundary are handled as delete + create rather than // being entirely skipped. - oldExcluded := isEntryExcluded(resp.Directory, message.OldEntry, reExcludeFileName, excludeFileNames, excludePathPatterns) - newExcluded := isEntryExcluded(message.NewParentPath, message.NewEntry, reExcludeFileName, excludeFileNames, excludePathPatterns) + oldExcluded := oldDirExcluded || isEntryExcluded(resp.Directory, message.OldEntry, reExcludeFileName, excludeFileNames, excludePathPatterns) + newExcluded := newDirExcluded || isEntryExcluded(targetDir, message.NewEntry, reExcludeFileName, excludeFileNames, excludePathPatterns) if oldExcluded && newExcluded { return nil @@ -495,7 +505,7 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str if !doDeleteFiles { return nil } - if !strings.HasPrefix(string(sourceOldKey), sourcePath) { + if !util.IsEqualOrUnder(string(sourceOldKey), sourcePath) { return nil } key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath) @@ -504,7 +514,7 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str // handle new entries if filer_pb.IsCreate(resp) { - if !strings.HasPrefix(string(sourceNewKey), sourcePath) { + if !util.IsEqualOrUnder(string(sourceNewKey), sourcePath) { return nil } key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath) @@ -521,18 +531,19 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str } // handle updates - if strings.HasPrefix(string(sourceOldKey), sourcePath) { + if util.IsEqualOrUnder(string(sourceOldKey), sourcePath) { // old key is in the watched directory - if strings.HasPrefix(string(sourceNewKey), sourcePath) { + if util.IsEqualOrUnder(string(sourceNewKey), sourcePath) { // new key is also in the watched directory if doDeleteFiles { oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):]) + var sinkNewParentPath string if strings.HasSuffix(sourcePath, "/") { - message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath)-1:]) + sinkNewParentPath = util.Join(targetPath, targetDir[len(sourcePath)-1:]) } else { - message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):]) + sinkNewParentPath = util.Join(targetPath, targetDir[len(sourcePath):]) } - foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures) + foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, sinkNewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures) if foundExisting { return err } @@ -559,7 +570,7 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str } } else { // old key is outside the watched directory - if strings.HasPrefix(string(sourceNewKey), sourcePath) { + if util.IsEqualOrUnder(string(sourceNewKey), sourcePath) { // new key is in the watched directory key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath) if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil { @@ -623,6 +634,15 @@ func isEntryExcluded(dir string, entry *filer_pb.Entry, reExcludeFileName *regex return false } +func matchesExcludePath(dir string, excludePaths []string) bool { + for _, excludePath := range excludePaths { + if util.IsEqualOrUnder(dir, excludePath) { + return true + } + } + return false +} + // compileExcludePattern compiles a regexp pattern string, returning nil if empty. func compileExcludePattern(pattern string, label string) (*regexp.Regexp, error) { if pattern == "" { diff --git a/weed/command/filer_sync_process_test.go b/weed/command/filer_sync_process_test.go new file mode 100644 index 000000000..5febfb3d6 --- /dev/null +++ b/weed/command/filer_sync_process_test.go @@ -0,0 +1,121 @@ +package command + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/replication/sink" + "github.com/seaweedfs/seaweedfs/weed/replication/source" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +var _ sink.ReplicationSink = (*recordingSyncSink)(nil) + +type recordingSyncSink struct { + deleteKeys []string + createKeys []string + updateKeys []string +} + +func (s *recordingSyncSink) GetName() string { return "recording" } +func (s *recordingSyncSink) Initialize(util.Configuration, string) error { + return nil +} +func (s *recordingSyncSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error { + s.deleteKeys = append(s.deleteKeys, key) + return nil +} +func (s *recordingSyncSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error { + s.createKeys = append(s.createKeys, key) + return nil +} +func (s *recordingSyncSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (bool, error) { + s.updateKeys = append(s.updateKeys, key) + return true, nil +} +func (s *recordingSyncSink) GetSinkToDirectory() string { return "/dest" } +func (s *recordingSyncSink) SetSourceFiler(*source.FilerSource) {} +func (s *recordingSyncSink) IsIncremental() bool { return false } + +func TestPathIsEqualOrUnderUsesDirectoryBoundaries(t *testing.T) { + tests := []struct { + name string + candidate string + other string + expected bool + }{ + {name: "equal", candidate: "/foo", other: "/foo", expected: true}, + {name: "descendant", candidate: "/foo/bar", other: "/foo", expected: true}, + {name: "sibling prefix", candidate: "/foobar/bar", other: "/foo", expected: false}, + {name: "root", candidate: "/foo/bar", other: "/", expected: true}, + {name: "empty", candidate: "", other: "/foo", expected: false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := util.IsEqualOrUnder(tt.candidate, tt.other); got != tt.expected { + t.Fatalf("IsEqualOrUnder(%q, %q) = %v, want %v", tt.candidate, tt.other, got, tt.expected) + } + }) + } +} + +func TestMatchesExcludePathUsesDirectoryBoundaries(t *testing.T) { + if !matchesExcludePath("/tmp", []string{"/tmp"}) { + t.Fatal("expected exact directory match to be excluded") + } + if !matchesExcludePath("/tmp/sub", []string{"/tmp"}) { + t.Fatal("expected descendant directory to be excluded") + } + if matchesExcludePath("/tmp2/sub", []string{"/tmp"}) { + t.Fatal("did not expect sibling directory to be excluded") + } +} + +func TestGenProcessFunctionRenameToSiblingPrefixBecomesDelete(t *testing.T) { + dataSink := &recordingSyncSink{} + processFn := genProcessFunction("/foo", "/dest", nil, nil, nil, nil, dataSink, true, false) + + err := processFn(&filer_pb.SubscribeMetadataResponse{ + Directory: "/foo/dir", + EventNotification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "file.txt"}, + NewEntry: &filer_pb.Entry{Name: "file.txt"}, + NewParentPath: "/foobar/dir", + }, + }) + if err != nil { + t.Fatalf("processFn rename to sibling prefix: %v", err) + } + + if len(dataSink.deleteKeys) != 1 || dataSink.deleteKeys[0] != "/dest/dir/file.txt" { + t.Fatalf("delete keys = %v, want [/dest/dir/file.txt]", dataSink.deleteKeys) + } + if len(dataSink.createKeys) != 0 || len(dataSink.updateKeys) != 0 { + t.Fatalf("unexpected create/update calls: creates=%v updates=%v", dataSink.createKeys, dataSink.updateKeys) + } +} + +func TestGenProcessFunctionRenameFromExcludedDirBecomesCreate(t *testing.T) { + dataSink := &recordingSyncSink{} + processFn := genProcessFunction("/foo", "/dest", []string{"/foo/excluded"}, nil, nil, nil, dataSink, true, false) + + err := processFn(&filer_pb.SubscribeMetadataResponse{ + Directory: "/foo/excluded", + EventNotification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "file.txt"}, + NewEntry: &filer_pb.Entry{Name: "file.txt"}, + NewParentPath: "/foo/live", + }, + }) + if err != nil { + t.Fatalf("processFn rename from excluded dir: %v", err) + } + + if len(dataSink.createKeys) != 1 || dataSink.createKeys[0] != "/dest/live/file.txt" { + t.Fatalf("create keys = %v, want [/dest/live/file.txt]", dataSink.createKeys) + } + if len(dataSink.deleteKeys) != 0 || len(dataSink.updateKeys) != 0 { + t.Fatalf("unexpected delete/update calls: deletes=%v updates=%v", dataSink.deleteKeys, dataSink.updateKeys) + } +} diff --git a/weed/command/metadata_event_scope.go b/weed/command/metadata_event_scope.go new file mode 100644 index 000000000..4168ba4a4 --- /dev/null +++ b/weed/command/metadata_event_scope.go @@ -0,0 +1,36 @@ +package command + +import ( + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +func metadataEventDirectoryMembership(resp *filer_pb.SubscribeMetadataResponse, dir string) (sourceInDir, targetInDir bool) { + if resp == nil || resp.EventNotification == nil { + return false, false + } + + sourceInDir = util.IsEqualOrUnder(resp.Directory, dir) + targetInDir = resp.EventNotification.NewEntry != nil && + util.IsEqualOrUnder(filer_pb.MetadataEventTargetDirectory(resp), dir) + + return sourceInDir, targetInDir +} + +func metadataEventUpdatesDirectory(resp *filer_pb.SubscribeMetadataResponse, dir string) bool { + if resp == nil || resp.EventNotification == nil || resp.EventNotification.NewEntry == nil { + return false + } + + _, targetInDir := metadataEventDirectoryMembership(resp, dir) + return targetInDir +} + +func metadataEventRemovesFromDirectory(resp *filer_pb.SubscribeMetadataResponse, dir string) bool { + if resp == nil || resp.EventNotification == nil || resp.EventNotification.OldEntry == nil { + return false + } + + sourceInDir, targetInDir := metadataEventDirectoryMembership(resp, dir) + return sourceInDir && !targetInDir +} diff --git a/weed/command/metadata_event_scope_test.go b/weed/command/metadata_event_scope_test.go new file mode 100644 index 000000000..42a5aeadf --- /dev/null +++ b/weed/command/metadata_event_scope_test.go @@ -0,0 +1,116 @@ +package command + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +func TestMetadataEventDirectoryMembershipUsesDirectoryBoundaries(t *testing.T) { + resp := &filer_pb.SubscribeMetadataResponse{ + Directory: filer.DirectoryEtcRemote, + EventNotification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "remote.conf"}, + NewEntry: &filer_pb.Entry{Name: "remote.conf"}, + NewParentPath: "/etc/remote-sibling", + }, + } + + sourceInDir, targetInDir := metadataEventDirectoryMembership(resp, filer.DirectoryEtcRemote) + if !sourceInDir { + t.Fatal("expected source directory to match") + } + if targetInDir { + t.Fatal("did not expect sibling target directory to match") + } +} + +func TestMetadataEventUpdatesAndRemovesDirectory(t *testing.T) { + tests := []struct { + name string + resp *filer_pb.SubscribeMetadataResponse + wantUpdate bool + wantRemoval bool + }{ + { + name: "nil response", + resp: nil, + wantUpdate: false, + wantRemoval: false, + }, + { + name: "create event", + resp: &filer_pb.SubscribeMetadataResponse{ + Directory: filer.DirectoryEtcRemote, + EventNotification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "new.conf"}, + NewParentPath: filer.DirectoryEtcRemote, + }, + }, + wantUpdate: true, + wantRemoval: false, + }, + { + name: "rename out", + resp: &filer_pb.SubscribeMetadataResponse{ + Directory: filer.DirectoryEtcRemote, + EventNotification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "remote.conf"}, + NewEntry: &filer_pb.Entry{Name: "remote.conf"}, + NewParentPath: "/tmp", + }, + }, + wantUpdate: false, + wantRemoval: true, + }, + { + name: "rename into", + resp: &filer_pb.SubscribeMetadataResponse{ + Directory: "/tmp", + EventNotification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "remote.conf"}, + NewEntry: &filer_pb.Entry{Name: "remote.conf"}, + NewParentPath: filer.DirectoryEtcRemote, + }, + }, + wantUpdate: true, + wantRemoval: false, + }, + { + name: "rename within", + resp: &filer_pb.SubscribeMetadataResponse{ + Directory: filer.DirectoryEtcRemote, + EventNotification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "remote.conf"}, + NewEntry: &filer_pb.Entry{Name: "renamed.conf"}, + NewParentPath: filer.DirectoryEtcRemote, + }, + }, + wantUpdate: true, + wantRemoval: false, + }, + { + name: "delete", + resp: &filer_pb.SubscribeMetadataResponse{ + Directory: filer.DirectoryEtcRemote, + EventNotification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "remote.conf"}, + }, + }, + wantUpdate: false, + wantRemoval: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := metadataEventUpdatesDirectory(tt.resp, filer.DirectoryEtcRemote); got != tt.wantUpdate { + t.Fatalf("metadataEventUpdatesDirectory() = %v, want %v", got, tt.wantUpdate) + } + if got := metadataEventRemovesFromDirectory(tt.resp, filer.DirectoryEtcRemote); got != tt.wantRemoval { + t.Fatalf("metadataEventRemovesFromDirectory() = %v, want %v", got, tt.wantRemoval) + } + }) + } +} diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go index 16874855b..ba83aeb78 100644 --- a/weed/filer/filer_notify.go +++ b/weed/filer/filer_notify.go @@ -26,6 +26,10 @@ func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry } func (f *Filer) notifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry, deleteChunks, isFromOtherCluster bool, signatures []int32) *filer_pb.SubscribeMetadataResponse { + if metadataEventsSuppressed(ctx) { + return nil + } + var fullpath string if oldEntry != nil { fullpath = string(oldEntry.FullPath) diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go index 4ee80b3a6..b5d7d0341 100644 --- a/weed/filer/filer_on_meta_event.go +++ b/weed/filer/filer_on_meta_event.go @@ -19,17 +19,25 @@ func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) func (f *Filer) onBucketEvents(event *filer_pb.SubscribeMetadataResponse) { message := event.EventNotification + oldDir := event.Directory + newDir := filer_pb.MetadataEventTargetDirectory(event) - if f.DirBucketsPath == event.Directory { - if filer_pb.IsCreate(event) { - if message.NewEntry.IsDirectory { - f.Store.OnBucketCreation(message.NewEntry.Name) - } + if filer_pb.IsCreate(event) { + if newDir == f.DirBucketsPath && message.NewEntry.IsDirectory { + f.Store.OnBucketCreation(message.NewEntry.Name) } - if filer_pb.IsDelete(event) { - if message.OldEntry.IsDirectory { - f.Store.OnBucketDeletion(message.OldEntry.Name) - } + } + if filer_pb.IsDelete(event) { + if oldDir == f.DirBucketsPath && message.OldEntry.IsDirectory { + f.Store.OnBucketDeletion(message.OldEntry.Name) + } + } + if filer_pb.IsRename(event) { + if oldDir == f.DirBucketsPath && message.OldEntry.IsDirectory { + f.Store.OnBucketDeletion(message.OldEntry.Name) + } + if newDir == f.DirBucketsPath && message.NewEntry.IsDirectory { + f.Store.OnBucketCreation(message.NewEntry.Name) } } } @@ -72,10 +80,8 @@ func (f *Filer) onEmptyFolderCleanupEvents(event *filer_pb.SubscribeMetadataResp } func (f *Filer) maybeReloadFilerConfiguration(event *filer_pb.SubscribeMetadataResponse) { - if DirectoryEtcSeaweedFS != event.Directory { - if DirectoryEtcSeaweedFS != event.EventNotification.NewParentPath { - return - } + if !filer_pb.MetadataEventTouchesDirectory(event, DirectoryEtcSeaweedFS) { + return } entry := event.EventNotification.NewEntry diff --git a/weed/filer/filer_on_meta_event_test.go b/weed/filer/filer_on_meta_event_test.go new file mode 100644 index 000000000..330e711ef --- /dev/null +++ b/weed/filer/filer_on_meta_event_test.go @@ -0,0 +1,70 @@ +package filer + +import ( + "context" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +type bucketTrackingStore struct { + created []string + deleted []string +} + +func (s *bucketTrackingStore) GetName() string { return "bucket-tracking" } +func (s *bucketTrackingStore) Initialize(configuration util.Configuration, prefix string) error { + return nil +} +func (s *bucketTrackingStore) InsertEntry(context.Context, *Entry) error { return nil } +func (s *bucketTrackingStore) UpdateEntry(context.Context, *Entry) error { return nil } +func (s *bucketTrackingStore) FindEntry(context.Context, util.FullPath) (*Entry, error) { + return nil, filer_pb.ErrNotFound +} +func (s *bucketTrackingStore) DeleteEntry(context.Context, util.FullPath) error { return nil } +func (s *bucketTrackingStore) DeleteFolderChildren(context.Context, util.FullPath) error { return nil } +func (s *bucketTrackingStore) ListDirectoryEntries(context.Context, util.FullPath, string, bool, int64, ListEachEntryFunc) (string, error) { + return "", nil +} +func (s *bucketTrackingStore) ListDirectoryPrefixedEntries(context.Context, util.FullPath, string, bool, int64, string, ListEachEntryFunc) (string, error) { + return "", nil +} +func (s *bucketTrackingStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (s *bucketTrackingStore) CommitTransaction(context.Context) error { return nil } +func (s *bucketTrackingStore) RollbackTransaction(context.Context) error { return nil } +func (s *bucketTrackingStore) KvPut(context.Context, []byte, []byte) error { return nil } +func (s *bucketTrackingStore) KvGet(context.Context, []byte) ([]byte, error) { + return nil, ErrKvNotFound +} +func (s *bucketTrackingStore) KvDelete(context.Context, []byte) error { return nil } +func (s *bucketTrackingStore) Shutdown() {} +func (s *bucketTrackingStore) OnBucketCreation(bucket string) { s.created = append(s.created, bucket) } +func (s *bucketTrackingStore) OnBucketDeletion(bucket string) { s.deleted = append(s.deleted, bucket) } +func (s *bucketTrackingStore) CanDropWholeBucket() bool { return false } + +func TestOnBucketEventsRenameIntoBucketsRootCreatesBucket(t *testing.T) { + store := &bucketTrackingStore{} + f := &Filer{ + DirBucketsPath: "/buckets", + Store: NewFilerStoreWrapper(store), + } + + f.onBucketEvents(&filer_pb.SubscribeMetadataResponse{ + Directory: "/tmp", + EventNotification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "migrated", IsDirectory: true}, + NewEntry: &filer_pb.Entry{Name: "migrated", IsDirectory: true}, + NewParentPath: "/buckets", + }, + }) + + if len(store.created) != 1 || store.created[0] != "migrated" { + t.Fatalf("created buckets = %v, want [migrated]", store.created) + } + if len(store.deleted) != 0 { + t.Fatalf("deleted buckets = %v, want []", store.deleted) + } +} diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go index fd0eed2a8..0c8d692f2 100644 --- a/weed/filer/filerstore_wrapper.go +++ b/weed/filer/filerstore_wrapper.go @@ -25,10 +25,12 @@ type VirtualFilerStore interface { FilerStore DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error DeleteOneEntry(ctx context.Context, entry *Entry) error + InsertEntryKnownAbsent(ctx context.Context, entry *Entry) error AddPathSpecificStore(path string, storeId string, store FilerStore) OnBucketCreation(bucket string) OnBucketDeletion(bucket string) CanDropWholeBucket() bool + SameActualStore(a, b util.FullPath) bool } type FilerStoreWrapper struct { @@ -106,6 +108,13 @@ func (fsw *FilerStoreWrapper) getActualStore(path util.FullPath) (store FilerSto return } +// SameActualStore reports whether two paths resolve to the same underlying +// store. When path-specific stores are configured, different subtrees may +// be served by different backends. +func (fsw *FilerStoreWrapper) SameActualStore(a, b util.FullPath) bool { + return fsw.getActualStore(a) == fsw.getActualStore(b) +} + func (fsw *FilerStoreWrapper) getDefaultStore() (store FilerStore) { return fsw.defaultStore } @@ -143,6 +152,31 @@ func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) err return actualStore.InsertEntry(ctx, entry) } +// InsertEntryKnownAbsent skips the pre-insert FindEntry path when the caller has +// already established that the target path does not exist. +func (fsw *FilerStoreWrapper) InsertEntryKnownAbsent(ctx context.Context, entry *Entry) error { + ctx = context.WithoutCancel(ctx) + actualStore := fsw.getActualStore(entry.FullPath) + stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "insert").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "insert").Observe(time.Since(start).Seconds()) + }() + + filer_pb.BeforeEntrySerialization(entry.GetChunks()) + normalizeEntryMimeForStore(entry) + + if len(entry.HardLinkId) > 0 { + glog.V(4).InfofCtx(ctx, "InsertEntryKnownAbsent %s has HardLinkId %x counter=%d", + entry.FullPath, entry.HardLinkId, entry.HardLinkCounter) + if err := fsw.setHardLink(ctx, entry); err != nil { + return fmt.Errorf("setHardLink %x: %v", entry.HardLinkId, err) + } + } + + return actualStore.InsertEntry(ctx, entry) +} + func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error { ctx = context.WithoutCancel(ctx) actualStore := fsw.getActualStore(entry.FullPath) diff --git a/weed/filer/metadata_event_context.go b/weed/filer/metadata_event_context.go new file mode 100644 index 000000000..6db03b962 --- /dev/null +++ b/weed/filer/metadata_event_context.go @@ -0,0 +1,20 @@ +package filer + +import "context" + +type suppressMetadataEventsKey struct{} + +// WithSuppressedMetadataEvents disables automatic metadata event emission for +// nested filer operations that are part of a larger logical change, such as a +// rename implemented via create+delete. +func WithSuppressedMetadataEvents(ctx context.Context) context.Context { + return context.WithValue(ctx, suppressMetadataEventsKey{}, true) +} + +func metadataEventsSuppressed(ctx context.Context) bool { + if ctx == nil { + return false + } + suppressed, _ := ctx.Value(suppressMetadataEventsKey{}).(bool) + return suppressed +} diff --git a/weed/mount/filer_conf.go b/weed/mount/filer_conf.go index 48c9c98c7..ce631f644 100644 --- a/weed/mount/filer_conf.go +++ b/weed/mount/filer_conf.go @@ -46,19 +46,11 @@ func (wfs *WFS) subscribeFilerConfEvents() (*meta_cache.MetadataFollower, error) } processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { - message := resp.EventNotification - if message.NewEntry == nil { + if !isFilerConfUpdateEvent(resp, confDir, confName) { return nil } - dir := resp.Directory - name := resp.EventNotification.NewEntry.Name - - if dir != confDir || name != confName { - return nil - } - - content := message.NewEntry.Content + content := resp.EventNotification.NewEntry.Content fc := filer.NewFilerConf() if len(content) > 0 { if err = fc.LoadFromBytes(content); err != nil { @@ -76,6 +68,14 @@ func (wfs *WFS) subscribeFilerConfEvents() (*meta_cache.MetadataFollower, error) }, nil } +func isFilerConfUpdateEvent(resp *filer_pb.SubscribeMetadataResponse, confDir, confName string) bool { + if resp == nil || resp.EventNotification == nil || resp.EventNotification.NewEntry == nil { + return false + } + return filer_pb.MetadataEventTargetDirectory(resp) == confDir && + resp.EventNotification.NewEntry.Name == confName +} + func (wfs *WFS) wormEnforcedForEntry(path util.FullPath, entry *filer_pb.Entry) (wormEnforced, wormEnabled bool) { if entry == nil || wfs.FilerConf == nil { return false, false diff --git a/weed/mount/filer_conf_test.go b/weed/mount/filer_conf_test.go new file mode 100644 index 000000000..6c8af5259 --- /dev/null +++ b/weed/mount/filer_conf_test.go @@ -0,0 +1,23 @@ +package mount + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +func TestIsFilerConfUpdateEventMatchesRenameTarget(t *testing.T) { + event := &filer_pb.SubscribeMetadataResponse{ + Directory: "/tmp", + EventNotification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: filer.FilerConfName}, + NewEntry: &filer_pb.Entry{Name: filer.FilerConfName}, + NewParentPath: filer.DirectoryEtcSeaweedFS, + }, + } + + if !isFilerConfUpdateEvent(event, filer.DirectoryEtcSeaweedFS, filer.FilerConfName) { + t.Fatalf("expected rename target to match filer.conf watcher") + } +} diff --git a/weed/notification/webhook/filter.go b/weed/notification/webhook/filter.go index f346d6c93..659bc3f89 100644 --- a/weed/notification/webhook/filter.go +++ b/weed/notification/webhook/filter.go @@ -5,6 +5,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" ) type filter struct { @@ -40,25 +41,37 @@ func newFilter(cfg *config) *filter { } func (f *filter) shouldPublish(key string, notification *filer_pb.EventNotification) bool { - if !f.matchesPath(key) { + if !f.matchesPath(key, notification) { return false } - eventType := detectEventType(notification) + eventType := detectEventType(key, notification) return f.eventTypes[eventType] } -func (f *filter) matchesPath(key string) bool { +func (f *filter) matchesPath(key string, notification *filer_pb.EventNotification) bool { if len(f.pathPrefixes) == 0 { return true } + if f.matchesAnyPathPrefix(key) { + return true + } + + if notification != nil && notification.NewEntry != nil && notification.NewParentPath != "" { + newKey := string(util.FullPath(notification.NewParentPath).Child(notification.NewEntry.Name)) + return f.matchesAnyPathPrefix(newKey) + } + + return false +} + +func (f *filter) matchesAnyPathPrefix(key string) bool { for _, prefix := range f.pathPrefixes { if strings.HasPrefix(key, prefix) { return true } } - return false } diff --git a/weed/notification/webhook/filter_test.go b/weed/notification/webhook/filter_test.go index e95a085fe..34332dccf 100644 --- a/weed/notification/webhook/filter_test.go +++ b/weed/notification/webhook/filter_test.go @@ -9,6 +9,7 @@ import ( func TestFilterEventTypes(t *testing.T) { tests := []struct { name string + key string eventTypes []string notification *filer_pb.EventNotification expectedType eventType @@ -16,6 +17,7 @@ func TestFilterEventTypes(t *testing.T) { }{ { name: "create event - allowed", + key: "/test/test.txt", eventTypes: []string{"create", "delete"}, notification: &filer_pb.EventNotification{ NewEntry: &filer_pb.Entry{Name: "test.txt"}, @@ -25,6 +27,7 @@ func TestFilterEventTypes(t *testing.T) { }, { name: "create event - not allowed", + key: "/test/test.txt", eventTypes: []string{"delete", "update"}, notification: &filer_pb.EventNotification{ NewEntry: &filer_pb.Entry{Name: "test.txt"}, @@ -34,6 +37,7 @@ func TestFilterEventTypes(t *testing.T) { }, { name: "delete event - allowed", + key: "/test/test.txt", eventTypes: []string{"create", "delete"}, notification: &filer_pb.EventNotification{ OldEntry: &filer_pb.Entry{Name: "test.txt"}, @@ -43,16 +47,19 @@ func TestFilterEventTypes(t *testing.T) { }, { name: "update event - allowed", + key: "/test/test.txt", eventTypes: []string{"update"}, notification: &filer_pb.EventNotification{ - OldEntry: &filer_pb.Entry{Name: "test.txt"}, - NewEntry: &filer_pb.Entry{Name: "test.txt"}, + OldEntry: &filer_pb.Entry{Name: "test.txt"}, + NewEntry: &filer_pb.Entry{Name: "test.txt"}, + NewParentPath: "/test", }, expectedType: eventTypeUpdate, shouldPublish: true, }, { name: "rename event - allowed", + key: "/old/path/old.txt", eventTypes: []string{"rename"}, notification: &filer_pb.EventNotification{ OldEntry: &filer_pb.Entry{Name: "old.txt"}, @@ -62,8 +69,21 @@ func TestFilterEventTypes(t *testing.T) { expectedType: eventTypeRename, shouldPublish: true, }, + { + name: "rename event same name different parent - allowed", + key: "/old/path/file.txt", + eventTypes: []string{"rename"}, + notification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "file.txt"}, + NewEntry: &filer_pb.Entry{Name: "file.txt"}, + NewParentPath: "/new/path", + }, + expectedType: eventTypeRename, + shouldPublish: true, + }, { name: "rename event - not allowed", + key: "/old/path/old.txt", eventTypes: []string{"create", "delete", "update"}, notification: &filer_pb.EventNotification{ OldEntry: &filer_pb.Entry{Name: "old.txt"}, @@ -75,6 +95,7 @@ func TestFilterEventTypes(t *testing.T) { }, { name: "all events allowed when empty", + key: "/test/test.txt", eventTypes: []string{}, notification: &filer_pb.EventNotification{ NewEntry: &filer_pb.Entry{Name: "test.txt"}, @@ -89,12 +110,12 @@ func TestFilterEventTypes(t *testing.T) { cfg := &config{eventTypes: tt.eventTypes} f := newFilter(cfg) - eventType := detectEventType(tt.notification) + eventType := detectEventType(tt.key, tt.notification) if eventType != tt.expectedType { t.Errorf("detectEventType() = %v, want %v", eventType, tt.expectedType) } - shouldPublish := f.shouldPublish("/test/path", tt.notification) + shouldPublish := f.shouldPublish(tt.key, tt.notification) if shouldPublish != tt.shouldPublish { t.Errorf("shouldPublish() = %v, want %v", shouldPublish, tt.shouldPublish) } @@ -106,60 +127,82 @@ func TestFilterPathPrefixes(t *testing.T) { tests := []struct { name string pathPrefixes []string + eventTypes []string key string + notification *filer_pb.EventNotification shouldPublish bool }{ { name: "matches single prefix", pathPrefixes: []string{"/data/"}, + eventTypes: []string{"create"}, key: "/data/file.txt", + notification: &filer_pb.EventNotification{NewEntry: &filer_pb.Entry{Name: "file.txt"}}, shouldPublish: true, }, { name: "matches one of multiple prefixes", pathPrefixes: []string{"/data/", "/logs/", "/tmp/"}, + eventTypes: []string{"create"}, key: "/logs/app.log", + notification: &filer_pb.EventNotification{NewEntry: &filer_pb.Entry{Name: "app.log"}}, shouldPublish: true, }, { name: "no match", pathPrefixes: []string{"/data/", "/logs/"}, + eventTypes: []string{"create"}, key: "/other/file.txt", + notification: &filer_pb.EventNotification{NewEntry: &filer_pb.Entry{Name: "file.txt"}}, shouldPublish: false, }, { name: "empty prefixes allows all", pathPrefixes: []string{}, + eventTypes: []string{"create"}, key: "/any/path/file.txt", + notification: &filer_pb.EventNotification{NewEntry: &filer_pb.Entry{Name: "file.txt"}}, shouldPublish: true, }, { name: "exact prefix match", pathPrefixes: []string{"/data"}, + eventTypes: []string{"create"}, key: "/data", + notification: &filer_pb.EventNotification{NewEntry: &filer_pb.Entry{Name: "data"}}, shouldPublish: true, }, { name: "partial match not allowed", pathPrefixes: []string{"/data/"}, + eventTypes: []string{"create"}, key: "/database/file.txt", + notification: &filer_pb.EventNotification{NewEntry: &filer_pb.Entry{Name: "file.txt"}}, shouldPublish: false, }, + { + name: "rename matches destination prefix", + pathPrefixes: []string{"/watched/"}, + eventTypes: []string{"rename"}, + key: "/outside/old.txt", + notification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "old.txt"}, + NewEntry: &filer_pb.Entry{Name: "new.txt"}, + NewParentPath: "/watched", + }, + shouldPublish: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { cfg := &config{ pathPrefixes: tt.pathPrefixes, - eventTypes: []string{"create"}, + eventTypes: tt.eventTypes, } f := newFilter(cfg) - notification := &filer_pb.EventNotification{ - NewEntry: &filer_pb.Entry{Name: "test.txt"}, - } - - shouldPublish := f.shouldPublish(tt.key, notification) + shouldPublish := f.shouldPublish(tt.key, tt.notification) if shouldPublish != tt.shouldPublish { t.Errorf("shouldPublish() = %v, want %v", shouldPublish, tt.shouldPublish) } diff --git a/weed/notification/webhook/types.go b/weed/notification/webhook/types.go index 5cd79c7da..af1fb0242 100644 --- a/weed/notification/webhook/types.go +++ b/weed/notification/webhook/types.go @@ -59,7 +59,7 @@ func newWebhookMessage(key string, message proto.Message) *webhookMessage { return nil } - eventType := string(detectEventType(notification)) + eventType := string(detectEventType(key, notification)) return &webhookMessage{ Key: key, @@ -157,10 +157,9 @@ func (c *config) validate() error { return nil } -func detectEventType(notification *filer_pb.EventNotification) eventType { +func detectEventType(key string, notification *filer_pb.EventNotification) eventType { hasOldEntry := notification.OldEntry != nil hasNewEntry := notification.NewEntry != nil - hasNewParentPath := notification.NewParentPath != "" if !hasOldEntry && hasNewEntry { return eventTypeCreate @@ -171,7 +170,12 @@ func detectEventType(notification *filer_pb.EventNotification) eventType { } if hasOldEntry && hasNewEntry { - if hasNewParentPath { + oldDir, _ := util.FullPath(key).DirAndName() + newDir := notification.NewParentPath + if newDir == "" { + newDir = oldDir + } + if oldDir != newDir || notification.OldEntry.Name != notification.NewEntry.Name { return eventTypeRename } diff --git a/weed/pb/filer_pb/filer_pb_helper.go b/weed/pb/filer_pb/filer_pb_helper.go index a6d6fa10b..05d5f602a 100644 --- a/weed/pb/filer_pb/filer_pb_helper.go +++ b/weed/pb/filer_pb/filer_pb_helper.go @@ -11,6 +11,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/util" "github.com/viant/ptrie" "google.golang.org/protobuf/proto" ) @@ -247,6 +248,119 @@ func IsRename(event *SubscribeMetadataResponse) bool { event.EventNotification.NewEntry.Name != event.EventNotification.OldEntry.Name) } +func MetadataEventSourceDirectory(event *SubscribeMetadataResponse) string { + if event == nil { + return "" + } + return event.Directory +} + +func MetadataEventTargetDirectory(event *SubscribeMetadataResponse) string { + if event == nil { + return "" + } + if event.EventNotification != nil && event.EventNotification.NewParentPath != "" { + return event.EventNotification.NewParentPath + } + return event.Directory +} + +func metadataEventSourceEntryName(event *SubscribeMetadataResponse) string { + if event == nil || event.EventNotification == nil { + return "" + } + if event.EventNotification.OldEntry != nil { + return event.EventNotification.OldEntry.Name + } + if event.EventNotification.NewEntry != nil { + return event.EventNotification.NewEntry.Name + } + return "" +} + +func metadataEventTargetEntryName(event *SubscribeMetadataResponse) string { + if event == nil || event.EventNotification == nil { + return "" + } + if event.EventNotification.NewEntry != nil { + return event.EventNotification.NewEntry.Name + } + if event.EventNotification.OldEntry != nil { + return event.EventNotification.OldEntry.Name + } + return "" +} + +func MetadataEventSourceFullPath(event *SubscribeMetadataResponse) string { + return util.Join(MetadataEventSourceDirectory(event), metadataEventSourceEntryName(event)) +} + +func MetadataEventTargetFullPath(event *SubscribeMetadataResponse) string { + return util.Join(MetadataEventTargetDirectory(event), metadataEventTargetEntryName(event)) +} + +func MetadataEventTouchesDirectory(event *SubscribeMetadataResponse, dir string) bool { + if MetadataEventSourceDirectory(event) == dir { + return true + } + return event != nil && + event.EventNotification != nil && + event.EventNotification.NewEntry != nil && + MetadataEventTargetDirectory(event) == dir +} + +func MetadataEventTouchesDirectoryPrefix(event *SubscribeMetadataResponse, prefix string) bool { + if strings.HasPrefix(MetadataEventSourceDirectory(event), prefix) { + return true + } + return event != nil && + event.EventNotification != nil && + event.EventNotification.NewEntry != nil && + strings.HasPrefix(MetadataEventTargetDirectory(event), prefix) +} + +func MetadataEventMatchesSubscription(event *SubscribeMetadataResponse, pathPrefix string, pathPrefixes []string, directories []string) bool { + if event == nil { + return false + } + + if metadataEventMatchesPath(MetadataEventSourceFullPath(event), MetadataEventSourceDirectory(event), pathPrefix, pathPrefixes, directories) { + return true + } + + return event.EventNotification != nil && + event.EventNotification.NewEntry != nil && + metadataEventMatchesPath(MetadataEventTargetFullPath(event), MetadataEventTargetDirectory(event), pathPrefix, pathPrefixes, directories) +} + +func metadataEventMatchesPath(fullPath, dirPath, pathPrefix string, pathPrefixes []string, directories []string) bool { + if hasPrefixIn(fullPath, pathPrefixes) { + return true + } + if matchByDirectory(dirPath, directories) { + return true + } + return strings.HasPrefix(fullPath, pathPrefix) +} + +func hasPrefixIn(text string, prefixes []string) bool { + for _, p := range prefixes { + if strings.HasPrefix(text, p) { + return true + } + } + return false +} + +func matchByDirectory(dirPath string, directories []string) bool { + for _, dir := range directories { + if dirPath == dir { + return true + } + } + return false +} + var _ = ptrie.KeyProvider(&FilerConf_PathConf{}) func (fp *FilerConf_PathConf) Key() interface{} { diff --git a/weed/pb/filer_pb/filer_pb_helper_test.go b/weed/pb/filer_pb/filer_pb_helper_test.go index 4e0b53017..b38b094e3 100644 --- a/weed/pb/filer_pb/filer_pb_helper_test.go +++ b/weed/pb/filer_pb/filer_pb_helper_test.go @@ -15,3 +15,73 @@ func TestFileIdSize(t *testing.T) { println(len(fileIdStr)) println(len(bytes)) } + +func TestMetadataEventMatchesSubscription(t *testing.T) { + event := &SubscribeMetadataResponse{ + Directory: "/tmp", + EventNotification: &EventNotification{ + OldEntry: &Entry{Name: "old-name"}, + NewEntry: &Entry{Name: "new-name"}, + NewParentPath: "/watched", + }, + } + + tests := []struct { + name string + pathPrefix string + pathPrefixes []string + directories []string + }{ + { + name: "primary path prefix matches rename target", + pathPrefix: "/watched/new-name", + }, + { + name: "additional path prefix matches rename target", + pathPrefix: "/data", + pathPrefixes: []string{"/watched"}, + }, + { + name: "directory watch matches rename target directory", + pathPrefix: "/data", + directories: []string{"/watched"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if !MetadataEventMatchesSubscription(event, tt.pathPrefix, tt.pathPrefixes, tt.directories) { + t.Fatalf("MetadataEventMatchesSubscription returned false") + } + }) + } +} + +func TestMetadataEventTouchesDirectoryHelpers(t *testing.T) { + renameInto := &SubscribeMetadataResponse{ + Directory: "/tmp", + EventNotification: &EventNotification{ + OldEntry: &Entry{Name: "filer.conf"}, + NewEntry: &Entry{Name: "filer.conf"}, + NewParentPath: "/etc/seaweedfs", + }, + } + if got := MetadataEventTargetDirectory(renameInto); got != "/etc/seaweedfs" { + t.Fatalf("MetadataEventTargetDirectory = %q, want /etc/seaweedfs", got) + } + if !MetadataEventTouchesDirectory(renameInto, "/etc/seaweedfs") { + t.Fatalf("expected rename target to touch /etc/seaweedfs") + } + + renameOut := &SubscribeMetadataResponse{ + Directory: "/etc/remote", + EventNotification: &EventNotification{ + OldEntry: &Entry{Name: "remote.conf"}, + NewEntry: &Entry{Name: "remote.conf"}, + NewParentPath: "/tmp", + }, + } + if !MetadataEventTouchesDirectoryPrefix(renameOut, "/etc/remote") { + t.Fatalf("expected rename source to touch /etc/remote") + } +} diff --git a/weed/pb/filer_pb_direct_read.go b/weed/pb/filer_pb_direct_read.go index 7da56a215..294b78c7a 100644 --- a/weed/pb/filer_pb_direct_read.go +++ b/weed/pb/filer_pb_direct_read.go @@ -255,53 +255,14 @@ func processOneLogEntry(logEntry *filer_pb.LogEntry, filter PathFilter, processE const systemLogDir = "/topics/.system/log" func matchesFilter(resp *filer_pb.SubscribeMetadataResponse, filter PathFilter) bool { - var entryName string - if resp.EventNotification != nil { - if resp.EventNotification.OldEntry != nil { - entryName = resp.EventNotification.OldEntry.Name - } else if resp.EventNotification.NewEntry != nil { - entryName = resp.EventNotification.NewEntry.Name - } - } - - fullpath := util.Join(resp.Directory, entryName) + fullpath := filer_pb.MetadataEventSourceFullPath(resp) // Skip internal meta log entries if strings.HasPrefix(fullpath, systemLogDir) { return false } - // Check AdditionalPathPrefixes - for _, p := range filter.AdditionalPathPrefixes { - if strings.HasPrefix(fullpath, p) { - return true - } - } - - // Check DirectoriesToWatch (exact directory match) - for _, dir := range filter.DirectoriesToWatch { - if resp.Directory == dir { - return true - } - } - - // Check primary PathPrefix - if filter.PathPrefix == "" || filter.PathPrefix == "/" { - return true - } - if strings.HasPrefix(fullpath, filter.PathPrefix) { - return true - } - - // Check rename target - if resp.EventNotification != nil && resp.EventNotification.NewParentPath != "" { - newFullPath := util.Join(resp.EventNotification.NewParentPath, entryName) - if strings.HasPrefix(newFullPath, filter.PathPrefix) { - return true - } - } - - return false + return filer_pb.MetadataEventMatchesSubscription(resp, filter.PathPrefix, filter.AdditionalPathPrefixes, filter.DirectoriesToWatch) } // isChunkNotFound checks if an error indicates a missing volume chunk. @@ -323,10 +284,10 @@ type logEntryHeapItem struct { type logEntryHeap []*logEntryHeapItem -func (h logEntryHeap) Len() int { return len(h) } -func (h logEntryHeap) Less(i, j int) bool { return h[i].entry.TsNs < h[j].entry.TsNs } -func (h logEntryHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } -func (h *logEntryHeap) Push(x any) { *h = append(*h, x.(*logEntryHeapItem)) } +func (h logEntryHeap) Len() int { return len(h) } +func (h logEntryHeap) Less(i, j int) bool { return h[i].entry.TsNs < h[j].entry.TsNs } +func (h logEntryHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *logEntryHeap) Push(x any) { *h = append(*h, x.(*logEntryHeapItem)) } func (h *logEntryHeap) Pop() any { old := *h n := len(old) diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index 5ea7c3a2c..c992906fa 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -3,7 +3,6 @@ package replication import ( "context" "fmt" - "strings" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -39,56 +38,125 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p if message.IsFromOtherCluster && r.sink.GetName() == "filer" { return nil } - if !strings.HasPrefix(key, r.source.Dir) { - glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir) - return nil - } - for _, excludeDir := range r.excludeDirs { - if strings.HasPrefix(key, excludeDir) { - glog.V(4).Infof("skipping %v of exclude dir %v", key, excludeDir) + + oldEntry := message.OldEntry + newEntry := message.NewEntry + newParentPath := message.NewParentPath + + oldInSource := util.IsEqualOrUnder(key, r.source.Dir) && !r.isExcluded(key) + + // For rename events (both old and new entry present), check both paths + // against the source directory. Convert cross-boundary renames to + // create or delete so the sink stays consistent. + if oldEntry != nil && newEntry != nil { + newFullPath, targetParent := metadataEventTarget(key, newEntry, newParentPath) + newInSource := util.IsEqualOrUnder(newFullPath, r.source.Dir) && !r.isExcluded(newFullPath) + + if !oldInSource && !newInSource { return nil } + if !oldInSource { + // Rename into watched directory: treat as create + oldEntry = nil + key = newFullPath + newParentPath = targetParent + } else if !newInSource { + // Rename out of watched directory: treat as delete + newEntry = nil + newParentPath = "" + } + } else if !oldInSource { + glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir) + return nil } var dateKey string if r.sink.IsIncremental() { var mTime int64 - if message.NewEntry != nil { - mTime = message.NewEntry.Attributes.Mtime - } else if message.OldEntry != nil { - mTime = message.OldEntry.Attributes.Mtime + if newEntry != nil { + mTime = newEntry.Attributes.Mtime + } else if oldEntry != nil { + mTime = oldEntry.Attributes.Mtime } dateKey = time.Unix(mTime, 0).Format("2006-01-02") } - newKey := util.Join(r.sink.GetSinkToDirectory(), dateKey, key[len(r.source.Dir):]) - glog.V(3).Infof("replicate %s => %s", key, newKey) - key = newKey - if message.OldEntry != nil && message.NewEntry == nil { - glog.V(4).Infof("deleting %v", key) - return r.sink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) + oldSinkKey := r.sourceToSinkKey(key, dateKey) + glog.V(3).Infof("replicate %s => %s", key, oldSinkKey) + + newSinkKey := oldSinkKey + newSinkParentPath := newParentPath + if oldEntry != nil && newEntry != nil { + targetSourceKey, targetSourceParent := metadataEventTarget(key, newEntry, newParentPath) + newSinkKey = r.sourceToSinkKey(targetSourceKey, dateKey) + newSinkParentPath = r.sourceToSinkPath(targetSourceParent, dateKey) + } else if newParentPath != "" && util.IsEqualOrUnder(newParentPath, r.source.Dir) { + newSinkParentPath = r.sourceToSinkPath(newParentPath, dateKey) + } + + if oldEntry != nil && newEntry == nil { + glog.V(4).Infof("deleting %v", oldSinkKey) + return r.sink.DeleteEntry(oldSinkKey, oldEntry.IsDirectory, message.DeleteChunks, message.Signatures) } - if message.OldEntry == nil && message.NewEntry != nil { - glog.V(4).Infof("creating %v", key) - return r.sink.CreateEntry(key, message.NewEntry, message.Signatures) + if oldEntry == nil && newEntry != nil { + glog.V(4).Infof("creating %v", oldSinkKey) + return r.sink.CreateEntry(oldSinkKey, newEntry, message.Signatures) } - if message.OldEntry == nil && message.NewEntry == nil { + if oldEntry == nil && newEntry == nil { glog.V(0).Infof("weird message %+v", message) return nil } - foundExisting, err := r.sink.UpdateEntry(key, message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures) + if oldSinkKey != newSinkKey && r.sink.GetName() != "filer" { + if err := r.sink.DeleteEntry(oldSinkKey, oldEntry.IsDirectory, false, message.Signatures); err != nil { + return fmt.Errorf("delete old entry %v: %w", oldSinkKey, err) + } + glog.V(4).Infof("creating renamed %v", newSinkKey) + return r.sink.CreateEntry(newSinkKey, newEntry, message.Signatures) + } + + foundExisting, err := r.sink.UpdateEntry(oldSinkKey, oldEntry, newSinkParentPath, newEntry, message.DeleteChunks, message.Signatures) if foundExisting { - glog.V(4).Infof("updated %v", key) + glog.V(4).Infof("updated %v", oldSinkKey) return err } - err = r.sink.DeleteEntry(key, message.OldEntry.IsDirectory, false, message.Signatures) + err = r.sink.DeleteEntry(oldSinkKey, oldEntry.IsDirectory, false, message.Signatures) if err != nil { - return fmt.Errorf("delete old entry %v: %w", key, err) + return fmt.Errorf("delete old entry %v: %w", oldSinkKey, err) + } + + glog.V(4).Infof("creating missing %v", newSinkKey) + return r.sink.CreateEntry(newSinkKey, newEntry, message.Signatures) +} + +func (r *Replicator) isExcluded(path string) bool { + for _, excludeDir := range r.excludeDirs { + if util.IsEqualOrUnder(path, excludeDir) { + return true + } + } + return false +} + +func (r *Replicator) sourceToSinkKey(sourceKey, dateKey string) string { + return util.Join(r.sink.GetSinkToDirectory(), dateKey, sourceKey[len(r.source.Dir):]) +} + +func (r *Replicator) sourceToSinkPath(sourcePath, dateKey string) string { + return util.Join(r.sink.GetSinkToDirectory(), dateKey, sourcePath[len(r.source.Dir):]) +} + +func metadataEventTarget(key string, newEntry *filer_pb.Entry, newParentPath string) (targetKey, targetParent string) { + if newEntry == nil { + return "", "" + } + + targetParent = newParentPath + if targetParent == "" { + targetParent, _ = util.FullPath(key).DirAndName() } - glog.V(4).Infof("creating missing %v", key) - return r.sink.CreateEntry(key, message.NewEntry, message.Signatures) + return util.Join(targetParent, newEntry.Name), targetParent } func ReadFilerSignature(grpcDialOption grpc.DialOption, filer pb.ServerAddress) (filerSignature int32, readErr error) { diff --git a/weed/replication/replicator_test.go b/weed/replication/replicator_test.go new file mode 100644 index 000000000..b2ef2c2c6 --- /dev/null +++ b/weed/replication/replicator_test.go @@ -0,0 +1,284 @@ +package replication + +import ( + "context" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/replication/sink" + "github.com/seaweedfs/seaweedfs/weed/replication/source" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +var _ sink.ReplicationSink = (*recordingSink)(nil) + +type deleteCall struct { + key string + isDirectory bool +} + +type createCall struct { + key string +} + +type updateCall struct { + key string + newParentPath string +} + +type recordingSink struct { + name string + sinkToDirectory string + incremental bool + updateFoundExisting bool + + deleteCalls []deleteCall + createCalls []createCall + updateCalls []updateCall +} + +func (s *recordingSink) GetName() string { + return s.name +} + +func (s *recordingSink) Initialize(util.Configuration, string) error { + return nil +} + +func (s *recordingSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error { + s.deleteCalls = append(s.deleteCalls, deleteCall{key: key, isDirectory: isDirectory}) + return nil +} + +func (s *recordingSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error { + s.createCalls = append(s.createCalls, createCall{key: key}) + return nil +} + +func (s *recordingSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (bool, error) { + s.updateCalls = append(s.updateCalls, updateCall{key: key, newParentPath: newParentPath}) + return s.updateFoundExisting, nil +} + +func (s *recordingSink) GetSinkToDirectory() string { + return s.sinkToDirectory +} + +func (s *recordingSink) SetSourceFiler(*source.FilerSource) {} + +func (s *recordingSink) IsIncremental() bool { + return s.incremental +} + +func TestReplicateRenameUsesTargetKeyForNonFilerSink(t *testing.T) { + s := &recordingSink{name: "local", sinkToDirectory: "/dest"} + r := &Replicator{ + sink: s, + source: &source.FilerSource{Dir: "/source"}, + } + + err := r.Replicate(context.Background(), "/source/old/file.txt", &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{ + Name: "file.txt", + Attributes: &filer_pb.FuseAttributes{ + Mtime: 123, + }, + }, + NewEntry: &filer_pb.Entry{ + Name: "renamed.txt", + Attributes: &filer_pb.FuseAttributes{ + Mtime: 123, + }, + }, + NewParentPath: "/source/new", + }) + if err != nil { + t.Fatalf("Replicate rename: %v", err) + } + + if len(s.updateCalls) != 0 { + t.Fatalf("expected non-filer rename to bypass UpdateEntry, got %d calls", len(s.updateCalls)) + } + if len(s.deleteCalls) != 1 || s.deleteCalls[0].key != "/dest/old/file.txt" { + t.Fatalf("delete calls = %+v, want old sink key", s.deleteCalls) + } + if len(s.createCalls) != 1 || s.createCalls[0].key != "/dest/new/renamed.txt" { + t.Fatalf("create calls = %+v, want target sink key", s.createCalls) + } +} + +func TestReplicateRenameUsesUpdateForFilerSink(t *testing.T) { + s := &recordingSink{ + name: "filer", + sinkToDirectory: "/dest", + updateFoundExisting: true, + } + r := &Replicator{ + sink: s, + source: &source.FilerSource{Dir: "/source"}, + } + + err := r.Replicate(context.Background(), "/source/old/file.txt", &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{ + Name: "file.txt", + Attributes: &filer_pb.FuseAttributes{ + Mtime: 123, + }, + }, + NewEntry: &filer_pb.Entry{ + Name: "renamed.txt", + Attributes: &filer_pb.FuseAttributes{ + Mtime: 123, + }, + }, + NewParentPath: "/source/new", + }) + if err != nil { + t.Fatalf("Replicate rename: %v", err) + } + + if len(s.updateCalls) != 1 { + t.Fatalf("update calls = %d, want 1", len(s.updateCalls)) + } + if s.updateCalls[0].key != "/dest/old/file.txt" { + t.Fatalf("update key = %q, want /dest/old/file.txt", s.updateCalls[0].key) + } + if s.updateCalls[0].newParentPath != "/dest/new" { + t.Fatalf("update newParentPath = %q, want /dest/new", s.updateCalls[0].newParentPath) + } + if len(s.deleteCalls) != 0 || len(s.createCalls) != 0 { + t.Fatalf("unexpected delete/create calls: deletes=%+v creates=%+v", s.deleteCalls, s.createCalls) + } +} + +func TestReplicateRenameFallbackCreatesTargetKey(t *testing.T) { + s := &recordingSink{ + name: "filer", + sinkToDirectory: "/dest", + updateFoundExisting: false, + } + r := &Replicator{ + sink: s, + source: &source.FilerSource{Dir: "/source"}, + } + + err := r.Replicate(context.Background(), "/source/old/file.txt", &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{ + Name: "file.txt", + Attributes: &filer_pb.FuseAttributes{ + Mtime: 123, + }, + }, + NewEntry: &filer_pb.Entry{ + Name: "renamed.txt", + Attributes: &filer_pb.FuseAttributes{ + Mtime: 123, + }, + }, + NewParentPath: "/source/new", + }) + if err != nil { + t.Fatalf("Replicate rename fallback: %v", err) + } + + if len(s.updateCalls) != 1 { + t.Fatalf("update calls = %d, want 1", len(s.updateCalls)) + } + if len(s.deleteCalls) != 1 || s.deleteCalls[0].key != "/dest/old/file.txt" { + t.Fatalf("delete calls = %+v, want old sink key", s.deleteCalls) + } + if len(s.createCalls) != 1 || s.createCalls[0].key != "/dest/new/renamed.txt" { + t.Fatalf("create calls = %+v, want target sink key", s.createCalls) + } +} + +func TestPathIsEqualOrUnderUsesDirectoryBoundaries(t *testing.T) { + tests := []struct { + name string + candidate string + other string + expected bool + }{ + {name: "equal", candidate: "/foo", other: "/foo", expected: true}, + {name: "descendant", candidate: "/foo/bar", other: "/foo", expected: true}, + {name: "sibling prefix", candidate: "/foobar/bar", other: "/foo", expected: false}, + {name: "root", candidate: "/foo/bar", other: "/", expected: true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := util.IsEqualOrUnder(tt.candidate, tt.other); got != tt.expected { + t.Fatalf("IsEqualOrUnder(%q, %q) = %v, want %v", tt.candidate, tt.other, got, tt.expected) + } + }) + } +} + +func TestReplicateRenameOutToSiblingPrefixBecomesDelete(t *testing.T) { + s := &recordingSink{name: "local", sinkToDirectory: "/dest"} + r := &Replicator{ + sink: s, + source: &source.FilerSource{Dir: "/foo"}, + } + + err := r.Replicate(context.Background(), "/foo/old/file.txt", &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{ + Name: "file.txt", + Attributes: &filer_pb.FuseAttributes{ + Mtime: 123, + }, + }, + NewEntry: &filer_pb.Entry{ + Name: "file.txt", + Attributes: &filer_pb.FuseAttributes{ + Mtime: 123, + }, + }, + NewParentPath: "/foobar/new", + }) + if err != nil { + t.Fatalf("Replicate rename out to sibling prefix: %v", err) + } + + if len(s.deleteCalls) != 1 || s.deleteCalls[0].key != "/dest/old/file.txt" { + t.Fatalf("delete calls = %+v, want old sink key", s.deleteCalls) + } + if len(s.createCalls) != 0 || len(s.updateCalls) != 0 { + t.Fatalf("unexpected create/update calls: creates=%+v updates=%+v", s.createCalls, s.updateCalls) + } +} + +func TestReplicateRenameFromExcludedDirBecomesCreate(t *testing.T) { + s := &recordingSink{name: "local", sinkToDirectory: "/dest"} + r := &Replicator{ + sink: s, + source: &source.FilerSource{Dir: "/foo"}, + excludeDirs: []string{"/foo/excluded"}, + } + + err := r.Replicate(context.Background(), "/foo/excluded/file.txt", &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{ + Name: "file.txt", + Attributes: &filer_pb.FuseAttributes{ + Mtime: 123, + }, + }, + NewEntry: &filer_pb.Entry{ + Name: "file.txt", + Attributes: &filer_pb.FuseAttributes{ + Mtime: 123, + }, + }, + NewParentPath: "/foo/live", + }) + if err != nil { + t.Fatalf("Replicate rename from excluded dir: %v", err) + } + + if len(s.createCalls) != 1 || s.createCalls[0].key != "/dest/live/file.txt" { + t.Fatalf("create calls = %+v, want target sink key", s.createCalls) + } + if len(s.deleteCalls) != 0 || len(s.updateCalls) != 0 { + t.Fatalf("unexpected delete/update calls: deletes=%+v updates=%+v", s.deleteCalls, s.updateCalls) + } +} diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go index 382ed69a8..ac970497f 100644 --- a/weed/server/filer_grpc_server_rename.go +++ b/weed/server/filer_grpc_server_rename.go @@ -34,7 +34,8 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom return nil, fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err) } - moveErr := fs.moveEntry(ctx, nil, oldParent, oldEntry, newParent, req.NewName, req.Signatures) + var metadataEvents []metadataEvent + moveErr := fs.moveEntry(ctx, nil, oldParent, oldEntry, newParent, req.NewName, req.Signatures, false, &metadataEvents) if moveErr != nil { fs.filer.RollbackTransaction(ctx) return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr) @@ -44,6 +45,9 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom return nil, fmt.Errorf("%s/%s move commit error: %v", req.OldDirectory, req.OldName, commitError) } } + for _, event := range metadataEvents { + event.notify(fs.filer, ctx, req.Signatures) + } return &filer_pb.AtomicRenameEntryResponse{}, nil } @@ -87,7 +91,8 @@ func (fs *FilerServer) StreamRenameEntry(req *filer_pb.StreamRenameEntryRequest, } } - moveErr := fs.moveEntry(ctx, stream, oldParent, oldEntry, newParent, req.NewName, req.Signatures) + var metadataEvents []metadataEvent + moveErr := fs.moveEntry(ctx, stream, oldParent, oldEntry, newParent, req.NewName, req.Signatures, false, &metadataEvents) if moveErr != nil { fs.filer.RollbackTransaction(ctx) return fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr) @@ -97,27 +102,40 @@ func (fs *FilerServer) StreamRenameEntry(req *filer_pb.StreamRenameEntryRequest, return fmt.Errorf("%s/%s move commit error: %v", req.OldDirectory, req.OldName, commitError) } } + for _, event := range metadataEvents { + event.notify(fs.filer, ctx, req.Signatures) + } return nil } -func (fs *FilerServer) moveEntry(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error { +type metadataEvent struct { + oldEntry *filer.Entry + newEntry *filer.Entry + deleteChunks bool +} + +func (event metadataEvent) notify(f *filer.Filer, ctx context.Context, signatures []int32) { + f.NotifyUpdateEvent(ctx, event.oldEntry, event.newEntry, event.deleteChunks, false, signatures) +} + +func (fs *FilerServer) moveEntry(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32, skipTargetLookup bool, metadataEvents *[]metadataEvent) error { if err := fs.moveSelfEntry(ctx, stream, oldParent, entry, newParent, newName, func() error { if entry.IsDirectory() { - if err := fs.moveFolderSubEntries(ctx, stream, oldParent, entry, newParent, newName, signatures); err != nil { + if err := fs.moveFolderSubEntries(ctx, stream, oldParent, entry, newParent, newName, signatures, metadataEvents); err != nil { return err } } return nil - }, signatures); err != nil { + }, signatures, skipTargetLookup, metadataEvents); err != nil { return fmt.Errorf("fail to move %s => %s: %v", oldParent.Child(entry.Name()), newParent.Child(newName), err) } return nil } -func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error { +func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32, metadataEvents *[]metadataEvent) error { currentDirPath := oldParent.Child(entry.Name()) newDirPath := newParent.Child(newName) @@ -138,7 +156,9 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, stream filer_pb for _, item := range entries { lastFileName = item.Name() // println("processing", lastFileName) - err := fs.moveEntry(ctx, stream, currentDirPath, item, newDirPath, item.Name(), signatures) + newChildPath := newDirPath.Child(item.Name()) + skipTarget := fs.filer.Store.SameActualStore(newDirPath, newChildPath) + err := fs.moveEntry(ctx, stream, currentDirPath, item, newDirPath, item.Name(), signatures, skipTarget, metadataEvents) if err != nil { return err } @@ -150,7 +170,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, stream filer_pb return nil } -func (fs *FilerServer) moveSelfEntry(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error, signatures []int32) error { +func (fs *FilerServer) moveSelfEntry(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error, signatures []int32, skipTargetLookup bool, metadataEvents *[]metadataEvent) error { oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName) @@ -161,6 +181,18 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, stream filer_pb.Seawee return nil } + sourceEntry := entry.ShallowClone() + sourceEntry.FullPath = oldPath + + var existingTarget *filer.Entry + if !skipTargetLookup { + if targetEntry, findErr := fs.filer.FindEntry(ctx, newPath); findErr == nil { + existingTarget = targetEntry.ShallowClone() + } else if findErr != filer_pb.ErrNotFound { + return findErr + } + } + // add to new directory newEntry := &filer.Entry{ FullPath: newPath, @@ -173,8 +205,17 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, stream filer_pb.Seawee Remote: entry.Remote, Quota: entry.Quota, } - if createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, signatures, false, fs.filer.MaxFilenameLength); createErr != nil { - return createErr + if skipTargetLookup { + if newEntry.FullPath.IsLongerFileName(fs.filer.MaxFilenameLength) { + return filer_pb.ErrEntryNameTooLong + } + if createErr := fs.filer.Store.InsertEntryKnownAbsent(filer.WithSuppressedMetadataEvents(ctx), newEntry); createErr != nil { + return fmt.Errorf("insert entry %s: %v", newEntry.FullPath, createErr) + } + } else { + if createErr := fs.filer.CreateEntry(filer.WithSuppressedMetadataEvents(ctx), newEntry, false, false, signatures, false, fs.filer.MaxFilenameLength); createErr != nil { + return createErr + } } if stream != nil { if err := stream.Send(&filer_pb.StreamRenameEntryResponse{ @@ -195,6 +236,17 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, stream filer_pb.Seawee } } + if existingTarget != nil { + *metadataEvents = append(*metadataEvents, metadataEvent{ + oldEntry: existingTarget, + deleteChunks: true, + }) + } + *metadataEvents = append(*metadataEvents, metadataEvent{ + oldEntry: sourceEntry, + newEntry: newEntry, + }) + if moveFolderSubEntries != nil { if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil { return moveChildrenErr @@ -203,7 +255,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, stream filer_pb.Seawee // delete old entry ctx = context.WithValue(ctx, "OP", "MV") - deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false, signatures, 0) + deleteErr := fs.filer.DeleteEntryMetaAndData(filer.WithSuppressedMetadataEvents(ctx), oldPath, false, false, false, false, signatures, 0) if deleteErr != nil { return deleteErr } diff --git a/weed/server/filer_grpc_server_rename_test.go b/weed/server/filer_grpc_server_rename_test.go new file mode 100644 index 000000000..aec0c0740 --- /dev/null +++ b/weed/server/filer_grpc_server_rename_test.go @@ -0,0 +1,472 @@ +package weed_server + +import ( + "context" + "errors" + "os" + "sort" + "strings" + "sync" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/cluster" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/notification" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" + "github.com/seaweedfs/seaweedfs/weed/wdclient" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/proto" +) + +type renameTestStore struct { + mu sync.Mutex + entries map[string]*filer.Entry + findCalls map[string]int + commitErr error + deleteErr error +} + +func newRenameTestStore() *renameTestStore { + return &renameTestStore{ + entries: make(map[string]*filer.Entry), + findCalls: make(map[string]int), + } +} + +func (s *renameTestStore) GetName() string { return "rename_test" } +func (s *renameTestStore) Initialize(util.Configuration, string) error { return nil } +func (s *renameTestStore) Shutdown() {} +func (s *renameTestStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (s *renameTestStore) CommitTransaction(context.Context) error { return s.commitErr } +func (s *renameTestStore) RollbackTransaction(context.Context) error { return nil } +func (s *renameTestStore) KvPut(context.Context, []byte, []byte) error { + return nil +} +func (s *renameTestStore) KvGet(context.Context, []byte) ([]byte, error) { + return nil, filer.ErrKvNotFound +} +func (s *renameTestStore) KvDelete(context.Context, []byte) error { return nil } + +func (s *renameTestStore) InsertEntry(_ context.Context, entry *filer.Entry) error { + s.mu.Lock() + defer s.mu.Unlock() + s.entries[string(entry.FullPath)] = entry.ShallowClone() + return nil +} + +func (s *renameTestStore) UpdateEntry(_ context.Context, entry *filer.Entry) error { + s.mu.Lock() + defer s.mu.Unlock() + s.entries[string(entry.FullPath)] = entry.ShallowClone() + return nil +} + +func (s *renameTestStore) FindEntry(_ context.Context, p util.FullPath) (*filer.Entry, error) { + s.mu.Lock() + defer s.mu.Unlock() + s.findCalls[string(p)]++ + entry, found := s.entries[string(p)] + if !found { + return nil, filer_pb.ErrNotFound + } + return entry.ShallowClone(), nil +} + +func (s *renameTestStore) DeleteEntry(_ context.Context, p util.FullPath) error { + if s.deleteErr != nil { + return s.deleteErr + } + s.mu.Lock() + defer s.mu.Unlock() + delete(s.entries, string(p)) + return nil +} + +func (s *renameTestStore) DeleteFolderChildren(_ context.Context, p util.FullPath) error { + s.mu.Lock() + defer s.mu.Unlock() + prefix := string(p) + "/" + for path := range s.entries { + if len(path) > len(prefix) && path[:len(prefix)] == prefix { + delete(s.entries, path) + } + } + return nil +} + +func (s *renameTestStore) listDirectoryEntries(dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (string, error) { + s.mu.Lock() + var entries []*filer.Entry + for path, entry := range s.entries { + if path == string(dirPath) { + continue + } + parent, _ := util.FullPath(path).DirAndName() + if parent != string(dirPath) { + continue + } + if prefix != "" && !strings.HasPrefix(entry.Name(), prefix) { + continue + } + entries = append(entries, entry.ShallowClone()) + } + s.mu.Unlock() + + sort.Slice(entries, func(i, j int) bool { + return entries[i].Name() < entries[j].Name() + }) + + count := int64(0) + lastFileName := "" + for _, entry := range entries { + name := entry.Name() + if startFileName != "" { + if includeStartFile { + if name < startFileName { + continue + } + } else if name <= startFileName { + continue + } + } + + lastFileName = name + if eachEntryFunc != nil { + includeMore, err := eachEntryFunc(entry) + if err != nil { + return lastFileName, err + } + if !includeMore { + return lastFileName, nil + } + } + + count++ + if limit > 0 && count >= limit { + break + } + } + + return lastFileName, nil +} + +func (s *renameTestStore) ListDirectoryEntries(_ context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) { + return s.listDirectoryEntries(dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) +} + +func (s *renameTestStore) ListDirectoryPrefixedEntries(_ context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (string, error) { + return s.listDirectoryEntries(dirPath, startFileName, includeStartFile, limit, prefix, eachEntryFunc) +} + +func (s *renameTestStore) findEntryCallCount(path string) int { + s.mu.Lock() + defer s.mu.Unlock() + return s.findCalls[path] +} + +type capturedEvent struct { + key string + notification *filer_pb.EventNotification +} + +type captureQueue struct { + mu sync.Mutex + events []capturedEvent +} + +var notificationQueueSwapMu sync.Mutex + +func (q *captureQueue) GetName() string { return "capture" } +func (q *captureQueue) Initialize(util.Configuration, string) error { return nil } +func (q *captureQueue) SendMessage(key string, message proto.Message) error { + notification, ok := message.(*filer_pb.EventNotification) + if !ok { + return nil + } + + q.mu.Lock() + defer q.mu.Unlock() + q.events = append(q.events, capturedEvent{ + key: key, + notification: proto.Clone(notification).(*filer_pb.EventNotification), + }) + return nil +} + +func (q *captureQueue) snapshot() []capturedEvent { + q.mu.Lock() + defer q.mu.Unlock() + events := make([]capturedEvent, len(q.events)) + copy(events, q.events) + return events +} + +func swapNotificationQueue(t *testing.T, q notification.MessageQueue) { + t.Helper() + notificationQueueSwapMu.Lock() + prevQueue := notification.Queue + notification.Queue = q + t.Cleanup(func() { + notification.Queue = prevQueue + notificationQueueSwapMu.Unlock() + }) +} + +func newRenameTestFiler(store *renameTestStore) *filer.Filer { + dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + masterClient := wdclient.NewMasterClient( + dialOption, + "test", + cluster.FilerType, + pb.ServerAddress("localhost:0"), + "", + "", + *pb.NewServiceDiscoveryFromMap(map[string]pb.ServerAddress{}), + ) + + return &filer.Filer{ + Store: filer.NewFilerStoreWrapper(store), + MasterClient: masterClient, + FilerConf: filer.NewFilerConf(), + RemoteStorage: filer.NewFilerRemoteStorage(), + MaxFilenameLength: 255, + LocalMetaLogBuffer: log_buffer.NewLogBuffer( + "test", + time.Minute, + func(*log_buffer.LogBuffer, time.Time, time.Time, []byte, int64, int64) {}, + nil, + func() {}, + ), + } +} + +func newFileEntry(path string, inode uint64) *filer.Entry { + now := time.Unix(1700000000, 0) + return &filer.Entry{ + FullPath: util.FullPath(path), + Attr: filer.Attr{ + Mtime: now, + Crtime: now, + Mode: 0644, + Inode: inode, + }, + } +} + +func newDirectoryEntry(path string, inode uint64) *filer.Entry { + now := time.Unix(1700000000, 0) + return &filer.Entry{ + FullPath: util.FullPath(path), + Attr: filer.Attr{ + Mtime: now, + Crtime: now, + Mode: os.ModeDir | 0755, + Inode: inode, + }, + } +} + +func TestAtomicRenameEntryEmitsLogicalRenameEvent(t *testing.T) { + store := newRenameTestStore() + store.entries["/src.txt"] = newFileEntry("/src.txt", 101) + + queue := &captureQueue{} + swapNotificationQueue(t, queue) + + server := &FilerServer{filer: newRenameTestFiler(store)} + _, err := server.AtomicRenameEntry(context.Background(), &filer_pb.AtomicRenameEntryRequest{ + OldDirectory: "/", + OldName: "src.txt", + NewDirectory: "/", + NewName: "dst.txt", + }) + if err != nil { + t.Fatalf("AtomicRenameEntry: %v", err) + } + + events := queue.snapshot() + if len(events) != 1 { + t.Fatalf("event count = %d, want 1", len(events)) + } + + event := events[0] + if event.key != "/src.txt" { + t.Fatalf("event key = %q, want /src.txt", event.key) + } + if event.notification.OldEntry == nil || event.notification.OldEntry.Name != "src.txt" { + t.Fatalf("old entry = %+v, want src.txt", event.notification.OldEntry) + } + if event.notification.NewEntry == nil || event.notification.NewEntry.Name != "dst.txt" { + t.Fatalf("new entry = %+v, want dst.txt", event.notification.NewEntry) + } + if event.notification.NewParentPath != "/" { + t.Fatalf("new parent path = %q, want /", event.notification.NewParentPath) + } + + if _, err := store.FindEntry(context.Background(), "/src.txt"); err != filer_pb.ErrNotFound { + t.Fatalf("source entry error = %v, want %v", err, filer_pb.ErrNotFound) + } + dst, err := store.FindEntry(context.Background(), "/dst.txt") + if err != nil { + t.Fatalf("find destination: %v", err) + } + if dst.Attr.Inode != 101 { + t.Fatalf("destination inode = %d, want 101", dst.Attr.Inode) + } +} + +func TestAtomicRenameEntryOverwriteEmitsDeleteThenRename(t *testing.T) { + store := newRenameTestStore() + store.entries["/src.txt"] = newFileEntry("/src.txt", 101) + store.entries["/dst.txt"] = newFileEntry("/dst.txt", 202) + + queue := &captureQueue{} + swapNotificationQueue(t, queue) + + server := &FilerServer{filer: newRenameTestFiler(store)} + _, err := server.AtomicRenameEntry(context.Background(), &filer_pb.AtomicRenameEntryRequest{ + OldDirectory: "/", + OldName: "src.txt", + NewDirectory: "/", + NewName: "dst.txt", + }) + if err != nil { + t.Fatalf("AtomicRenameEntry: %v", err) + } + + events := queue.snapshot() + if len(events) != 2 { + t.Fatalf("event count = %d, want 2", len(events)) + } + + deleteEvent := events[0] + if deleteEvent.key != "/dst.txt" { + t.Fatalf("delete event key = %q, want /dst.txt", deleteEvent.key) + } + if deleteEvent.notification.OldEntry == nil || deleteEvent.notification.OldEntry.Name != "dst.txt" { + t.Fatalf("delete old entry = %+v, want dst.txt", deleteEvent.notification.OldEntry) + } + if deleteEvent.notification.NewEntry != nil { + t.Fatalf("delete new entry = %+v, want nil", deleteEvent.notification.NewEntry) + } + if !deleteEvent.notification.DeleteChunks { + t.Fatal("delete event should delete chunks") + } + + renameEvent := events[1] + if renameEvent.key != "/src.txt" { + t.Fatalf("rename event key = %q, want /src.txt", renameEvent.key) + } + if renameEvent.notification.OldEntry == nil || renameEvent.notification.OldEntry.Name != "src.txt" { + t.Fatalf("rename old entry = %+v, want src.txt", renameEvent.notification.OldEntry) + } + if renameEvent.notification.NewEntry == nil || renameEvent.notification.NewEntry.Name != "dst.txt" { + t.Fatalf("rename new entry = %+v, want dst.txt", renameEvent.notification.NewEntry) + } + if renameEvent.notification.NewParentPath != "/" { + t.Fatalf("rename new parent path = %q, want /", renameEvent.notification.NewParentPath) + } + + if _, err := store.FindEntry(context.Background(), "/src.txt"); err != filer_pb.ErrNotFound { + t.Fatalf("source entry error = %v, want %v", err, filer_pb.ErrNotFound) + } + dst, err := store.FindEntry(context.Background(), "/dst.txt") + if err != nil { + t.Fatalf("find destination: %v", err) + } + if dst.Attr.Inode != 101 { + t.Fatalf("destination inode = %d, want 101", dst.Attr.Inode) + } +} + +func TestAtomicRenameEntryDoesNotEmitEventOnDeleteFailure(t *testing.T) { + store := newRenameTestStore() + store.entries["/src.txt"] = newFileEntry("/src.txt", 101) + store.deleteErr = errors.New("delete failed") + + queue := &captureQueue{} + swapNotificationQueue(t, queue) + + server := &FilerServer{filer: newRenameTestFiler(store)} + _, err := server.AtomicRenameEntry(context.Background(), &filer_pb.AtomicRenameEntryRequest{ + OldDirectory: "/", + OldName: "src.txt", + NewDirectory: "/", + NewName: "dst.txt", + }) + if err == nil { + t.Fatal("expected delete failure") + } + + if events := queue.snapshot(); len(events) != 0 { + t.Fatalf("event count = %d, want 0", len(events)) + } +} + +func TestAtomicRenameEntryDoesNotEmitEventOnCommitFailure(t *testing.T) { + store := newRenameTestStore() + store.entries["/src.txt"] = newFileEntry("/src.txt", 101) + store.commitErr = errors.New("commit failed") + + queue := &captureQueue{} + swapNotificationQueue(t, queue) + + server := &FilerServer{filer: newRenameTestFiler(store)} + _, err := server.AtomicRenameEntry(context.Background(), &filer_pb.AtomicRenameEntryRequest{ + OldDirectory: "/", + OldName: "src.txt", + NewDirectory: "/", + NewName: "dst.txt", + }) + if err == nil { + t.Fatal("expected commit failure") + } + + if events := queue.snapshot(); len(events) != 0 { + t.Fatalf("event count = %d, want 0", len(events)) + } +} + +func TestAtomicRenameEntrySkipsDescendantTargetLookups(t *testing.T) { + store := newRenameTestStore() + store.entries["/srcdir"] = newDirectoryEntry("/srcdir", 100) + store.entries["/srcdir/subdir"] = newDirectoryEntry("/srcdir/subdir", 101) + store.entries["/srcdir/subdir/file.txt"] = newFileEntry("/srcdir/subdir/file.txt", 102) + + queue := &captureQueue{} + swapNotificationQueue(t, queue) + + server := &FilerServer{filer: newRenameTestFiler(store)} + _, err := server.AtomicRenameEntry(context.Background(), &filer_pb.AtomicRenameEntryRequest{ + OldDirectory: "/", + OldName: "srcdir", + NewDirectory: "/", + NewName: "dstdir", + }) + if err != nil { + t.Fatalf("AtomicRenameEntry: %v", err) + } + + for _, target := range []string{"/dstdir/subdir", "/dstdir/subdir/file.txt"} { + if calls := store.findEntryCallCount(target); calls != 0 { + t.Fatalf("FindEntry(%q) called %d times, want 0", target, calls) + } + } + + for _, target := range []string{"/dstdir", "/dstdir/subdir", "/dstdir/subdir/file.txt"} { + if _, err := store.FindEntry(context.Background(), util.FullPath(target)); err != nil { + t.Fatalf("find renamed target %q: %v", target, err) + } + } + + if got := len(queue.snapshot()); got != 3 { + t.Fatalf("event count = %d, want 3", got) + } +} diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 16a138182..c436dee3c 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -47,10 +47,10 @@ const ( // current time (backlog catch-up), multiple events are packed into a single // stream.Send() using the Events field. Otherwise events are sent one-by-one. type pipelinedSender struct { - sendCh chan *filer_pb.SubscribeMetadataResponse - errCh chan error - done chan struct{} - canBatch bool // true only if client set ClientSupportsBatching + sendCh chan *filer_pb.SubscribeMetadataResponse + errCh chan error + done chan struct{} + canBatch bool // true only if client set ClientSupportsBatching } func newPipelinedSender(stream metadataStreamSender, bufSize int, clientSupportsBatching bool) *pipelinedSender { @@ -529,31 +529,19 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe return nil } - if hasPrefixIn(fullpath, req.PathPrefixes) { - // good - } else if matchByDirectory(dirPath, req.Directories) { - // good - } else { - if !strings.HasPrefix(fullpath, req.PathPrefix) { - if eventNotification.NewParentPath != "" { - newFullPath := util.Join(eventNotification.NewParentPath, entryName) - if !strings.HasPrefix(newFullPath, req.PathPrefix) { - return nil - } - } else { - return nil - } - } - } - - // collect timestamps for path - stats.FilerServerLastSendTsOfSubscribeGauge.WithLabelValues(fs.option.Host.String(), req.ClientName, req.PathPrefix).Set(float64(tsNs)) - message := &filer_pb.SubscribeMetadataResponse{ Directory: dirPath, EventNotification: eventNotification, TsNs: tsNs, } + + if !filer_pb.MetadataEventMatchesSubscription(message, req.PathPrefix, req.PathPrefixes, req.Directories) { + return nil + } + + // collect timestamps for path + stats.FilerServerLastSendTsOfSubscribeGauge.WithLabelValues(fs.option.Host.String(), req.ClientName, req.PathPrefix).Set(float64(tsNs)) + // println("sending", dirPath, entryName) if err := sender.Send(message); err != nil { glog.V(0).Infof("=> client %v: %+v", clientName, err) @@ -564,24 +552,6 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe } } -func hasPrefixIn(text string, prefixes []string) bool { - for _, p := range prefixes { - if strings.HasPrefix(text, p) { - return true - } - } - return false -} - -func matchByDirectory(dirPath string, directories []string) bool { - for _, dir := range directories { - if dirPath == dir { - return true - } - } - return false -} - func (fs *FilerServer) addClient(prefix string, clientType string, clientAddress string, clientId int32, clientEpoch int32) (isReplacing, alreadyKnown bool, clientName string) { clientName = clientType + "@" + clientAddress glog.V(0).Infof("+ %v listener %v clientId %v clientEpoch %v", prefix, clientName, clientId, clientEpoch) diff --git a/weed/server/filer_grpc_server_sub_meta_test.go b/weed/server/filer_grpc_server_sub_meta_test.go index 607dec278..2165899a6 100644 --- a/weed/server/filer_grpc_server_sub_meta_test.go +++ b/weed/server/filer_grpc_server_sub_meta_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" ) @@ -25,6 +27,15 @@ func (s *slowStream) Send(msg *filer_pb.SubscribeMetadataResponse) error { return nil } +type collectingStream struct { + messages []*filer_pb.SubscribeMetadataResponse +} + +func (s *collectingStream) Send(msg *filer_pb.SubscribeMetadataResponse) error { + s.messages = append(s.messages, msg) + return nil +} + func makeEvent(dir, name string, tsNs int64) *filer_pb.SubscribeMetadataResponse { return &filer_pb.SubscribeMetadataResponse{ Directory: dir, @@ -69,11 +80,11 @@ func makeRecentEvents(n int) []*filer_pb.SubscribeMetadataResponse { // - Pipelined+batched: file I/O overlaps with batched sending func TestPipelinedSenderThroughput(t *testing.T) { const ( - eventsPerFile = 300 // events in one minute-log file - numFiles = 7 // files to process - totalEvents = eventsPerFile * numFiles // 2100 - fileReadDelay = 5 * time.Millisecond // volume server read per log file - sendDelay = 50 * time.Microsecond // gRPC round-trip per Send() + eventsPerFile = 300 // events in one minute-log file + numFiles = 7 // files to process + totalEvents = eventsPerFile * numFiles // 2100 + fileReadDelay = 5 * time.Millisecond // volume server read per log file + sendDelay = 50 * time.Microsecond // gRPC round-trip per Send() ) // Partition old events into file-sized bursts @@ -138,6 +149,58 @@ func TestPipelinedSenderThroughput(t *testing.T) { } } +func TestEachEventNotificationFnMatchesRenameTargetsForAllWatchTypes(t *testing.T) { + fs := &FilerServer{ + option: &FilerOption{Host: pb.ServerAddress("127.0.0.1:8888")}, + filer: &filer.Filer{Signature: 123}, + } + + tests := []struct { + name string + req *filer_pb.SubscribeMetadataRequest + }{ + { + name: "additional path prefix", + req: &filer_pb.SubscribeMetadataRequest{ + ClientName: "test", + PathPrefix: "/data/", + PathPrefixes: []string{"/etc/remote"}, + }, + }, + { + name: "directory watch", + req: &filer_pb.SubscribeMetadataRequest{ + ClientName: "test", + PathPrefix: "/data/", + Directories: []string{"/etc/iam/identities"}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + stream := &collectingStream{} + eachEventFn := fs.eachEventNotificationFn(tt.req, stream, "client") + + newDir := "/etc/remote" + if len(tt.req.Directories) > 0 { + newDir = tt.req.Directories[0] + } + err := eachEventFn("/tmp", &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "old"}, + NewEntry: &filer_pb.Entry{Name: "new"}, + NewParentPath: newDir, + }, time.Now().UnixNano()) + if err != nil { + t.Fatalf("eachEventFn: %v", err) + } + if len(stream.messages) != 1 { + t.Fatalf("messages sent = %d, want 1", len(stream.messages)) + } + }) + } +} + // TestBatchingAdaptive verifies the adaptive behavior: old events are batched, // recent events are sent one-by-one. func TestBatchingAdaptive(t *testing.T) { @@ -243,8 +306,8 @@ func TestPipelinedSenderErrorPropagation(t *testing.T) { func TestPipelinedSingleVsParallelStreams(t *testing.T) { const ( numDirs = 10 - filesPerDir = 7 // log files per directory - eventsPerFile = 300 // events per log file + filesPerDir = 7 // log files per directory + eventsPerFile = 300 // events per log file totalEvents = numDirs * filesPerDir * eventsPerFile // 21000 fileReadDelay = 5 * time.Millisecond sendDelay = 50 * time.Microsecond diff --git a/weed/util/fullpath.go b/weed/util/fullpath.go index b485cae0d..9a2b198b7 100644 --- a/weed/util/fullpath.go +++ b/weed/util/fullpath.go @@ -80,6 +80,31 @@ func (fp FullPath) IsUnder(other FullPath) bool { return strings.HasPrefix(string(fp), string(other)+"/") } +// IsEqualOrUnder reports whether candidate is equal to or a descendant of +// other using proper directory boundaries (not a plain string prefix check). +// Empty strings always return false. +func IsEqualOrUnder(candidate, other string) bool { + candidatePath := NormalizePath(candidate) + otherPath := NormalizePath(other) + if candidatePath == "" || otherPath == "" { + return false + } + return candidatePath == otherPath || candidatePath.IsUnder(otherPath) +} + +// NormalizePath trims a trailing slash and returns a FullPath. +// Empty input returns "" (callers should treat this as "no path"). +func NormalizePath(p string) FullPath { + if p == "" { + return "" + } + trimmed := strings.TrimSuffix(p, "/") + if trimmed == "" { + return "/" + } + return FullPath(trimmed) +} + func StringSplit(separatedValues string, sep string) []string { if separatedValues == "" { return nil