diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index 56c7f7a8c..b7cb855f9 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -162,24 +162,21 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error { ctx := context.Background() message := resp.EventNotification - if message.OldEntry == nil && message.NewEntry == nil { + if filer_pb.IsEmpty(resp) { return nil - } - if message.OldEntry == nil && message.NewEntry != nil { + } else if filer_pb.IsCreate(resp) { println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)) entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry) return store.InsertEntry(ctx, entry) - } - if message.OldEntry != nil && message.NewEntry == nil { + } else if filer_pb.IsDelete(resp) { println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name)) return store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name)) - } - if message.OldEntry != nil && message.NewEntry != nil { - if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name { - println("~", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)) - entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry) - return store.UpdateEntry(ctx, entry) - } + } else if filer_pb.IsUpdate(resp) { + println("~", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)) + entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry) + return store.UpdateEntry(ctx, entry) + } else { + // renaming println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name)) if err := store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name)); err != nil { return err diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go index 1158ef1e0..51c4e7128 100644 --- a/weed/command/filer_meta_tail.go +++ b/weed/command/filer_meta_tail.go @@ -74,7 +74,7 @@ func runFilerMetaTail(cmd *Command, args []string) bool { } shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool { - if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil { + if filer_pb.IsEmpty(resp) { return false } if filterFunc == nil { diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go index afe640f5f..cc49a1b95 100644 --- a/weed/command/filer_remote_gateway_buckets.go +++ b/weed/command/filer_remote_gateway_buckets.go @@ -174,10 +174,10 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour return handleEtcRemoteChanges(resp) } - if message.OldEntry == nil && message.NewEntry == nil { + if filer_pb.IsEmpty(resp) { return nil } - if message.OldEntry == nil && message.NewEntry != nil { + if filer_pb.IsCreate(resp) { if message.NewParentPath == option.bucketsDir { return handleCreateBucket(message.NewEntry) } @@ -212,7 +212,7 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour } return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) } - if message.OldEntry != nil && message.NewEntry == nil { + if filer_pb.IsDelete(resp) { if resp.Directory == option.bucketsDir { return handleDeleteBucket(message.OldEntry) } diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go index ccedc9d80..5859645e9 100644 --- a/weed/command/filer_remote_sync_dir.go +++ b/weed/command/filer_remote_sync_dir.go @@ -91,10 +91,10 @@ func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, return handleEtcRemoteChanges(resp) } - if message.OldEntry == nil && message.NewEntry == nil { + if filer_pb.IsEmpty(resp) { return nil } - if message.OldEntry == nil && message.NewEntry != nil { + if filer_pb.IsCreate(resp) { if !filer.HasData(message.NewEntry) { return nil } @@ -115,7 +115,7 @@ func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, } return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) } - if message.OldEntry != nil && message.NewEntry == nil { + if filer_pb.IsDelete(resp) { glog.V(2).Infof("delete: %+v", resp) dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) if message.OldEntry.IsDirectory { diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 172be6a9a..37ce2aa73 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -262,7 +262,7 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl } // handle deletions - if message.OldEntry != nil && message.NewEntry == nil { + if filer_pb.IsDelete(resp) { if !strings.HasPrefix(string(sourceOldKey), sourcePath) { return nil } @@ -271,7 +271,7 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl } // handle new entries - if message.OldEntry == nil && message.NewEntry != nil { + if filer_pb.IsCreate(resp) { if !strings.HasPrefix(string(sourceNewKey), sourcePath) { return nil } @@ -280,7 +280,7 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl } // this is something special? - if message.OldEntry == nil && message.NewEntry == nil { + if filer_pb.IsEmpty(resp) { return nil } diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go index 720e019f4..3b290deca 100644 --- a/weed/filer/filer_on_meta_event.go +++ b/weed/filer/filer_on_meta_event.go @@ -22,12 +22,12 @@ func (f *Filer) onBucketEvents(event *filer_pb.SubscribeMetadataResponse) { } } if f.DirBucketsPath == event.Directory { - if message.OldEntry == nil && message.NewEntry != nil { + if filer_pb.IsCreate(event) { if message.NewEntry.IsDirectory { f.Store.OnBucketCreation(message.NewEntry.Name) } } - if message.OldEntry != nil && message.NewEntry == nil { + if filer_pb.IsDelete(event) { if message.OldEntry.IsDirectory { f.Store.OnBucketDeletion(message.OldEntry.Name) } diff --git a/weed/mount/meta_cache/meta_cache_subscribe.go b/weed/mount/meta_cache/meta_cache_subscribe.go index 881fee08f..a0c5935ca 100644 --- a/weed/mount/meta_cache/meta_cache_subscribe.go +++ b/weed/mount/meta_cache/meta_cache_subscribe.go @@ -45,9 +45,9 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil newKey := util.NewFullPath(dir, message.NewEntry.Name) mc.invalidateFunc(newKey, message.NewEntry) } - } else if message.OldEntry == nil && message.NewEntry != nil { + } else if filer_pb.IsCreate(resp) { // no need to invaalidate - } else if message.OldEntry != nil && message.NewEntry == nil { + } else if filer_pb.IsDelete(resp) { oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name) mc.invalidateFunc(oldKey, message.OldEntry) } diff --git a/weed/pb/filer_pb/filer_pb_helper.go b/weed/pb/filer_pb/filer_pb_helper.go index 052b09531..5f613a55d 100644 --- a/weed/pb/filer_pb/filer_pb_helper.go +++ b/weed/pb/filer_pb/filer_pb_helper.go @@ -136,6 +136,9 @@ func LookupEntry(client SeaweedFilerClient, request *LookupDirectoryEntryRequest var ErrNotFound = errors.New("filer: no entry is found in filer store") +func IsEmpty(event *SubscribeMetadataResponse) bool { + return event.EventNotification.NewEntry == nil && event.EventNotification.OldEntry == nil +} func IsCreate(event *SubscribeMetadataResponse) bool { return event.EventNotification.NewEntry != nil && event.EventNotification.OldEntry == nil }