From c0e36231ad1fcddc5e436d1419a2e4d99fb27002 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 6 Oct 2024 12:55:19 -0700 Subject: [PATCH] use only one metadata follow process --- weed/mount/filer_conf.go | 48 +++---------------- weed/mount/meta_cache/meta_cache_subscribe.go | 41 ++++++++++++++-- weed/mount/weedfs.go | 6 +-- 3 files changed, 46 insertions(+), 49 deletions(-) diff --git a/weed/mount/filer_conf.go b/weed/mount/filer_conf.go index 8b7d10dbc..bb5f33ce3 100644 --- a/weed/mount/filer_conf.go +++ b/weed/mount/filer_conf.go @@ -3,19 +3,15 @@ package mount import ( "errors" "fmt" - "path/filepath" - "sync/atomic" - "time" - "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" + "path/filepath" ) -func (wfs *WFS) subscribeFilerConfEvents() (func(), error) { - now := time.Now() +func (wfs *WFS) subscribeFilerConfEvents() (*meta_cache.MetadataFollower, error) { confDir := filer.DirectoryEtcSeaweedFS confName := filer.FilerConfName confFullName := filepath.Join(filer.DirectoryEtcSeaweedFS, filer.FilerConfName) @@ -71,41 +67,9 @@ func (wfs *WFS) subscribeFilerConfEvents() (func(), error) { return nil } - - metadataFollowOption := &pb.MetadataFollowOption{ - ClientName: "fuse", - ClientId: wfs.signature, - ClientEpoch: 1, - SelfSignature: 0, - PathPrefix: confFullName, - AdditionalPathPrefixes: nil, - StartTsNs: now.UnixNano(), - StopTsNs: 0, - EventErrorType: pb.FatalOnError, - } - - return func() { - // sync new conf changes - util.RetryUntil("followFilerConfChanges", func() error { - metadataFollowOption.ClientEpoch++ - i := atomic.LoadInt32(&wfs.option.filerIndex) - n := len(wfs.option.FilerAddresses) - err = pb.FollowMetadata(wfs.option.FilerAddresses[i], wfs.option.GrpcDialOption, metadataFollowOption, processEventFn) - if err == nil { - atomic.StoreInt32(&wfs.option.filerIndex, i) - return nil - } - - i++ - if i >= int32(n) { - i = 0 - } - - return err - }, func(err error) bool { - glog.V(0).Infof("fuse follow filer conf changes: %v", err) - return true - }) + return &meta_cache.MetadataFollower{ + PathPrefixToWatch: confFullName, + ProcessEventFn: processEventFn, }, nil } diff --git a/weed/mount/meta_cache/meta_cache_subscribe.go b/weed/mount/meta_cache/meta_cache_subscribe.go index d3bb27d08..e67045013 100644 --- a/weed/mount/meta_cache/meta_cache_subscribe.go +++ b/weed/mount/meta_cache/meta_cache_subscribe.go @@ -10,7 +10,42 @@ import ( "strings" ) -func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64) error { +type MetadataFollower struct { + PathPrefixToWatch string + ProcessEventFn func(resp *filer_pb.SubscribeMetadataResponse) error +} + +func mergeProceesors(mainProcessor func(resp *filer_pb.SubscribeMetadataResponse) error, followers ...*MetadataFollower) func(resp *filer_pb.SubscribeMetadataResponse) error { + return func(resp *filer_pb.SubscribeMetadataResponse) error { + + // build the full path + entry := resp.EventNotification.NewEntry + if entry == nil { + entry = resp.EventNotification.OldEntry + } + dir := resp.Directory + if resp.EventNotification.NewParentPath != "" { + dir = resp.EventNotification.NewParentPath + } + fp := util.NewFullPath(dir, entry.Name) + + for _, follower := range followers { + if strings.HasPrefix(string(fp), follower.PathPrefixToWatch) { + if err := follower.ProcessEventFn(resp); err != nil { + return err + } + } + } + return mainProcessor(resp) + } +} + +func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64, followers ...*MetadataFollower) error { + + var prefixes []string + for _, follower := range followers { + prefixes = append(prefixes, follower.PathPrefixToWatch) + } processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification @@ -69,7 +104,7 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil ClientEpoch: 1, SelfSignature: selfSignature, PathPrefix: prefix, - AdditionalPathPrefixes: nil, + AdditionalPathPrefixes: prefixes, DirectoriesToWatch: nil, StartTsNs: lastTsNs, StopTsNs: 0, @@ -77,7 +112,7 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil } util.RetryUntil("followMetaUpdates", func() error { metadataFollowOption.ClientEpoch++ - return pb.WithFilerClientFollowMetadata(client, metadataFollowOption, processEventFn) + return pb.WithFilerClientFollowMetadata(client, metadataFollowOption, mergeProceesors(processEventFn, followers...)) }, func(err error) bool { glog.Errorf("follow metadata updates: %v", err) return true diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 4f029bba8..ac665ce5e 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -141,15 +141,13 @@ func NewSeaweedFileSystem(option *Option) *WFS { } func (wfs *WFS) StartBackgroundTasks() error { - fn, err := wfs.subscribeFilerConfEvents() + follower, err := wfs.subscribeFilerConfEvents() if err != nil { return err } - go fn() - startTime := time.Now() - go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) + go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano(), follower) go wfs.loopCheckQuota() return nil