Browse Source

use only one metadata follow process

pull/6097/head
chrislu 4 months ago
parent
commit
c0e36231ad
  1. 48
      weed/mount/filer_conf.go
  2. 41
      weed/mount/meta_cache/meta_cache_subscribe.go
  3. 6
      weed/mount/weedfs.go

48
weed/mount/filer_conf.go

@ -3,19 +3,15 @@ package mount
import (
"errors"
"fmt"
"path/filepath"
"sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"path/filepath"
)
func (wfs *WFS) subscribeFilerConfEvents() (func(), error) {
now := time.Now()
func (wfs *WFS) subscribeFilerConfEvents() (*meta_cache.MetadataFollower, error) {
confDir := filer.DirectoryEtcSeaweedFS
confName := filer.FilerConfName
confFullName := filepath.Join(filer.DirectoryEtcSeaweedFS, filer.FilerConfName)
@ -71,41 +67,9 @@ func (wfs *WFS) subscribeFilerConfEvents() (func(), error) {
return nil
}
metadataFollowOption := &pb.MetadataFollowOption{
ClientName: "fuse",
ClientId: wfs.signature,
ClientEpoch: 1,
SelfSignature: 0,
PathPrefix: confFullName,
AdditionalPathPrefixes: nil,
StartTsNs: now.UnixNano(),
StopTsNs: 0,
EventErrorType: pb.FatalOnError,
}
return func() {
// sync new conf changes
util.RetryUntil("followFilerConfChanges", func() error {
metadataFollowOption.ClientEpoch++
i := atomic.LoadInt32(&wfs.option.filerIndex)
n := len(wfs.option.FilerAddresses)
err = pb.FollowMetadata(wfs.option.FilerAddresses[i], wfs.option.GrpcDialOption, metadataFollowOption, processEventFn)
if err == nil {
atomic.StoreInt32(&wfs.option.filerIndex, i)
return nil
}
i++
if i >= int32(n) {
i = 0
}
return err
}, func(err error) bool {
glog.V(0).Infof("fuse follow filer conf changes: %v", err)
return true
})
return &meta_cache.MetadataFollower{
PathPrefixToWatch: confFullName,
ProcessEventFn: processEventFn,
}, nil
}

41
weed/mount/meta_cache/meta_cache_subscribe.go

@ -10,7 +10,42 @@ import (
"strings"
)
func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64) error {
type MetadataFollower struct {
PathPrefixToWatch string
ProcessEventFn func(resp *filer_pb.SubscribeMetadataResponse) error
}
func mergeProceesors(mainProcessor func(resp *filer_pb.SubscribeMetadataResponse) error, followers ...*MetadataFollower) func(resp *filer_pb.SubscribeMetadataResponse) error {
return func(resp *filer_pb.SubscribeMetadataResponse) error {
// build the full path
entry := resp.EventNotification.NewEntry
if entry == nil {
entry = resp.EventNotification.OldEntry
}
dir := resp.Directory
if resp.EventNotification.NewParentPath != "" {
dir = resp.EventNotification.NewParentPath
}
fp := util.NewFullPath(dir, entry.Name)
for _, follower := range followers {
if strings.HasPrefix(string(fp), follower.PathPrefixToWatch) {
if err := follower.ProcessEventFn(resp); err != nil {
return err
}
}
}
return mainProcessor(resp)
}
}
func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64, followers ...*MetadataFollower) error {
var prefixes []string
for _, follower := range followers {
prefixes = append(prefixes, follower.PathPrefixToWatch)
}
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
@ -69,7 +104,7 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
ClientEpoch: 1,
SelfSignature: selfSignature,
PathPrefix: prefix,
AdditionalPathPrefixes: nil,
AdditionalPathPrefixes: prefixes,
DirectoriesToWatch: nil,
StartTsNs: lastTsNs,
StopTsNs: 0,
@ -77,7 +112,7 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
}
util.RetryUntil("followMetaUpdates", func() error {
metadataFollowOption.ClientEpoch++
return pb.WithFilerClientFollowMetadata(client, metadataFollowOption, processEventFn)
return pb.WithFilerClientFollowMetadata(client, metadataFollowOption, mergeProceesors(processEventFn, followers...))
}, func(err error) bool {
glog.Errorf("follow metadata updates: %v", err)
return true

6
weed/mount/weedfs.go

@ -141,15 +141,13 @@ func NewSeaweedFileSystem(option *Option) *WFS {
}
func (wfs *WFS) StartBackgroundTasks() error {
fn, err := wfs.subscribeFilerConfEvents()
follower, err := wfs.subscribeFilerConfEvents()
if err != nil {
return err
}
go fn()
startTime := time.Now()
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano(), follower)
go wfs.loopCheckQuota()
return nil

Loading…
Cancel
Save