You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

840 lines
25 KiB

package meta_cache
import (
"context"
"errors"
"os"
"sync"
"time"
"golang.org/x/sync/singleflight"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/filer/leveldb"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/protobuf/proto"
)
// need to have logic similar to FilerStoreWrapper
// e.g. fill fileId field for chunks
type MetaCache struct {
root util.FullPath
localStore filer.VirtualFilerStore
leveldbStore *leveldb.LevelDBStore // direct reference for batch operations
sync.RWMutex
uidGidMapper *UidGidMapper
markCachedFn func(fullpath util.FullPath)
isCachedFn func(fullpath util.FullPath) bool
invalidateFunc func(fullpath util.FullPath, entry *filer_pb.Entry)
onDirectoryUpdate func(dir util.FullPath)
visitGroup singleflight.Group // deduplicates concurrent EnsureVisited calls for the same path
applyCh chan metadataApplyRequest
applyDone chan struct{}
applyStateMu sync.Mutex
applyClosed bool
buildingDirs map[util.FullPath]*directoryBuildState
dedupRing dedupRingBuffer
}
var errMetaCacheClosed = errors.New("metadata cache is shut down")
type MetadataResponseApplyOptions struct {
NotifyDirectories bool
InvalidateEntries bool
}
var (
LocalMetadataResponseApplyOptions = MetadataResponseApplyOptions{
NotifyDirectories: true,
}
SubscriberMetadataResponseApplyOptions = MetadataResponseApplyOptions{
NotifyDirectories: true,
InvalidateEntries: true,
}
)
type directoryBuildState struct {
bufferedEvents []*filer_pb.SubscribeMetadataResponse
}
const recentEventDedupWindow = 4096
type metadataApplyRequestKind int
const (
metadataApplyEvent metadataApplyRequestKind = iota
metadataBeginBuild
metadataCompleteBuild
metadataAbortBuild
metadataShutdown
)
type metadataApplyRequest struct {
ctx context.Context
kind metadataApplyRequestKind
resp *filer_pb.SubscribeMetadataResponse
options MetadataResponseApplyOptions
buildPath util.FullPath
snapshotTsNs int64
done chan error
}
func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper, root util.FullPath,
markCachedFn func(path util.FullPath), isCachedFn func(path util.FullPath) bool, invalidateFunc func(util.FullPath, *filer_pb.Entry), onDirectoryUpdate func(dir util.FullPath)) *MetaCache {
leveldbStore, virtualStore := openMetaStore(dbFolder)
mc := &MetaCache{
root: root,
localStore: virtualStore,
leveldbStore: leveldbStore,
markCachedFn: markCachedFn,
isCachedFn: isCachedFn,
uidGidMapper: uidGidMapper,
onDirectoryUpdate: onDirectoryUpdate,
invalidateFunc: func(fullpath util.FullPath, entry *filer_pb.Entry) {
invalidateFunc(fullpath, entry)
},
applyCh: make(chan metadataApplyRequest, 128),
applyDone: make(chan struct{}),
buildingDirs: make(map[util.FullPath]*directoryBuildState),
dedupRing: newDedupRingBuffer(),
}
go mc.runApplyLoop()
return mc
}
func openMetaStore(dbFolder string) (*leveldb.LevelDBStore, filer.VirtualFilerStore) {
os.RemoveAll(dbFolder)
os.MkdirAll(dbFolder, 0755)
store := &leveldb.LevelDBStore{}
config := &cacheConfig{
dir: dbFolder,
}
if err := store.Initialize(config, ""); err != nil {
glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err)
}
return store, filer.NewFilerStoreWrapper(store)
}
func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer.Entry) error {
mc.Lock()
defer mc.Unlock()
return mc.doInsertEntry(ctx, entry)
}
func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) error {
return mc.localStore.InsertEntry(ctx, entry)
}
// doBatchInsertEntries inserts multiple entries using LevelDB's batch write.
// This is more efficient than inserting entries one by one.
func (mc *MetaCache) doBatchInsertEntries(ctx context.Context, entries []*filer.Entry) error {
return mc.leveldbStore.BatchInsertEntries(ctx, entries)
}
func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error {
mc.Lock()
defer mc.Unlock()
return mc.atomicUpdateEntryFromFilerLocked(ctx, oldPath, newEntry, false)
}
func (mc *MetaCache) atomicUpdateEntryFromFilerLocked(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry, allowUncachedInsert bool) error {
entry, err := mc.localStore.FindEntry(ctx, oldPath)
if err != nil && err != filer_pb.ErrNotFound {
glog.Errorf("Metacache: find entry error: %v", err)
return err
}
if entry != nil {
if oldPath != "" {
if newEntry != nil && oldPath == newEntry.FullPath {
// skip the unnecessary deletion
// leave the update to the following InsertEntry operation
} else {
ctx = context.WithValue(ctx, "OP", "MV")
glog.V(3).Infof("DeleteEntry %s", oldPath)
if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil {
return err
}
}
}
} else {
// println("unknown old directory:", oldDir)
}
if newEntry != nil {
newDir, _ := newEntry.DirAndName()
if allowUncachedInsert || mc.isCachedFn(util.FullPath(newDir)) {
glog.V(3).Infof("InsertEntry %s/%s", newDir, newEntry.Name())
if err := mc.localStore.InsertEntry(ctx, newEntry); err != nil {
return err
}
}
}
return nil
}
func (mc *MetaCache) ApplyMetadataResponse(ctx context.Context, resp *filer_pb.SubscribeMetadataResponse, options MetadataResponseApplyOptions) error {
if resp == nil || resp.EventNotification == nil {
return nil
}
clonedResp := proto.Clone(resp).(*filer_pb.SubscribeMetadataResponse)
return mc.applyMetadataResponseEnqueue(ctx, clonedResp, options)
}
// ApplyMetadataResponseOwned is like ApplyMetadataResponse but takes ownership
// of resp without cloning. The caller must not use resp after this call.
func (mc *MetaCache) ApplyMetadataResponseOwned(ctx context.Context, resp *filer_pb.SubscribeMetadataResponse, options MetadataResponseApplyOptions) error {
if resp == nil || resp.EventNotification == nil {
return nil
}
return mc.applyMetadataResponseEnqueue(ctx, resp, options)
}
func (mc *MetaCache) applyMetadataResponseEnqueue(ctx context.Context, resp *filer_pb.SubscribeMetadataResponse, options MetadataResponseApplyOptions) error {
if ctx == nil {
ctx = context.Background()
}
req := metadataApplyRequest{
// Use a non-cancellable context for the queued mutation so a
// cancelled caller doesn't abort the apply loop mid-write.
ctx: context.Background(),
kind: metadataApplyEvent,
resp: resp,
options: options,
done: make(chan error, 1),
}
if err := mc.enqueueApplyRequest(req); err != nil {
return err
}
select {
case err := <-req.done:
return err
case <-ctx.Done():
return ctx.Err()
}
}
func (mc *MetaCache) BeginDirectoryBuild(ctx context.Context, dirPath util.FullPath) error {
return mc.enqueueAndWait(ctx, metadataApplyRequest{
kind: metadataBeginBuild,
buildPath: dirPath,
})
}
func (mc *MetaCache) CompleteDirectoryBuild(ctx context.Context, dirPath util.FullPath, snapshotTsNs int64) error {
return mc.enqueueAndWait(ctx, metadataApplyRequest{
kind: metadataCompleteBuild,
buildPath: dirPath,
snapshotTsNs: snapshotTsNs,
})
}
func (mc *MetaCache) AbortDirectoryBuild(ctx context.Context, dirPath util.FullPath) error {
return mc.enqueueAndWait(ctx, metadataApplyRequest{
kind: metadataAbortBuild,
buildPath: dirPath,
})
}
func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
mc.Lock()
defer mc.Unlock()
return mc.localStore.UpdateEntry(ctx, entry)
}
func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer.Entry, err error) {
mc.RLock()
defer mc.RUnlock()
entry, err = mc.localStore.FindEntry(ctx, fp)
if err != nil {
return nil, err
}
if entry.TtlSec > 0 && entry.Crtime.Add(time.Duration(entry.TtlSec)*time.Second).Before(time.Now()) {
return nil, filer_pb.ErrNotFound
}
mc.mapIdFromFilerToLocal(entry)
return
}
func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
mc.Lock()
defer mc.Unlock()
return mc.localStore.DeleteEntry(ctx, fp)
}
func (mc *MetaCache) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
mc.Lock()
defer mc.Unlock()
return mc.localStore.DeleteFolderChildren(ctx, fp)
}
func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error {
mc.RLock()
defer mc.RUnlock()
if !mc.isCachedFn(dirPath) {
// if this request comes after renaming, it should be fine
glog.Warningf("unsynchronized dir: %v", dirPath)
}
_, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) (bool, error) {
if entry.TtlSec > 0 && entry.Crtime.Add(time.Duration(entry.TtlSec)*time.Second).Before(time.Now()) {
return true, nil
}
mc.mapIdFromFilerToLocal(entry)
return eachEntryFunc(entry)
})
if err != nil {
return err
}
return err
}
func (mc *MetaCache) Shutdown() {
done := make(chan error, 1)
mc.applyStateMu.Lock()
if !mc.applyClosed {
mc.applyClosed = true
mc.applyCh <- metadataApplyRequest{
kind: metadataShutdown,
done: done,
}
}
mc.applyStateMu.Unlock()
select {
case <-done:
case <-mc.applyDone:
}
<-mc.applyDone
mc.Lock()
defer mc.Unlock()
mc.localStore.Shutdown()
}
func (mc *MetaCache) mapIdFromFilerToLocal(entry *filer.Entry) {
entry.Attr.Uid, entry.Attr.Gid = mc.uidGidMapper.FilerToLocal(entry.Attr.Uid, entry.Attr.Gid)
}
func (mc *MetaCache) Debug() {
if debuggable, ok := mc.localStore.(filer.Debuggable); ok {
println("start debugging")
debuggable.Debug(os.Stderr)
}
}
// IsDirectoryCached returns true if the directory has been fully cached
// (i.e., all entries have been loaded via EnsureVisited or ReadDir).
func (mc *MetaCache) IsDirectoryCached(dirPath util.FullPath) bool {
return mc.isCachedFn(dirPath)
}
func (mc *MetaCache) noteDirectoryUpdate(dirPath util.FullPath) {
if mc.onDirectoryUpdate != nil {
mc.onDirectoryUpdate(dirPath)
}
}
func (mc *MetaCache) enqueueAndWait(ctx context.Context, req metadataApplyRequest) error {
if ctx == nil {
ctx = context.Background()
}
// Use a non-cancellable context for the queued operation so a
// cancelled caller doesn't abort a build/complete mid-way.
req.ctx = context.Background()
req.done = make(chan error, 1)
if err := mc.enqueueApplyRequest(req); err != nil {
return err
}
select {
case err := <-req.done:
return err
case <-ctx.Done():
return ctx.Err()
}
}
func (mc *MetaCache) enqueueApplyRequest(req metadataApplyRequest) error {
mc.applyStateMu.Lock()
if mc.applyClosed {
mc.applyStateMu.Unlock()
return errMetaCacheClosed
}
// Release the mutex before the potentially-blocking channel send so that
// Shutdown can still acquire it to set applyClosed when the channel is full.
mc.applyStateMu.Unlock()
select {
case mc.applyCh <- req:
return nil
case <-mc.applyDone:
return errMetaCacheClosed
}
}
func (mc *MetaCache) runApplyLoop() {
defer close(mc.applyDone)
for req := range mc.applyCh {
req.done <- mc.handleApplyRequest(req)
close(req.done)
if req.kind == metadataShutdown {
mc.drainApplyCh()
return
}
}
}
// drainApplyCh non-blockingly drains any remaining requests from applyCh
// after a shutdown sentinel, signalling each caller so they don't block.
func (mc *MetaCache) drainApplyCh() {
for {
select {
case req := <-mc.applyCh:
req.done <- errMetaCacheClosed
close(req.done)
default:
return
}
}
}
func (mc *MetaCache) handleApplyRequest(req metadataApplyRequest) error {
switch req.kind {
case metadataApplyEvent:
return mc.applyMetadataResponseNow(req.ctx, req.resp, req.options)
case metadataBeginBuild:
return mc.beginDirectoryBuildNow(req.buildPath)
case metadataCompleteBuild:
return mc.completeDirectoryBuildNow(req.ctx, req.buildPath, req.snapshotTsNs)
case metadataAbortBuild:
return mc.abortDirectoryBuildNow(req.buildPath)
case metadataShutdown:
return nil
default:
return nil
}
}
type metadataInvalidation struct {
path util.FullPath
entry *filer_pb.Entry
}
type metadataResponseSideEffects struct {
dirsToNotify []util.FullPath
invalidations []metadataInvalidation
}
func (mc *MetaCache) applyMetadataResponseNow(ctx context.Context, resp *filer_pb.SubscribeMetadataResponse, options MetadataResponseApplyOptions) error {
if mc.shouldSkipDuplicateEvent(resp) {
return nil
}
immediateEvents, bufferedEvents := mc.routeMetadataResponse(resp)
if len(bufferedEvents) == 0 {
return mc.applyMetadataResponseDirect(ctx, resp, options, false)
}
// Apply side effects but skip directory notifications for dirs that are
// currently being built. Notifying a building dir can trigger
// markDirectoryReadThrough → DeleteFolderChildren, wiping entries that
// EnsureVisited already inserted, leaving an incomplete cache.
mc.applyMetadataSideEffectsSkippingBuildingDirs(resp, options)
for buildDir, events := range bufferedEvents {
state := mc.buildingDirs[buildDir]
if state == nil {
continue
}
state.bufferedEvents = append(state.bufferedEvents, events...)
}
for _, immediateEvent := range immediateEvents {
if err := mc.applyMetadataResponseDirect(ctx, immediateEvent, MetadataResponseApplyOptions{}, false); err != nil {
return err
}
}
return nil
}
func (mc *MetaCache) applyMetadataResponseDirect(ctx context.Context, resp *filer_pb.SubscribeMetadataResponse, options MetadataResponseApplyOptions, allowUncachedInsert bool) error {
if _, err := mc.applyMetadataResponseLocked(ctx, resp, options, allowUncachedInsert); err != nil {
return err
}
mc.applyMetadataSideEffects(resp, options)
return nil
}
func (mc *MetaCache) applyMetadataSideEffects(resp *filer_pb.SubscribeMetadataResponse, options MetadataResponseApplyOptions) {
sideEffects := metadataResponseSideEffects{}
if options.NotifyDirectories {
sideEffects.dirsToNotify = collectDirectoryNotifications(resp)
}
if options.InvalidateEntries {
sideEffects.invalidations = collectEntryInvalidations(resp)
}
for _, dirPath := range sideEffects.dirsToNotify {
mc.noteDirectoryUpdate(dirPath)
}
for _, invalidation := range sideEffects.invalidations {
mc.invalidateFunc(invalidation.path, invalidation.entry)
}
}
// applyMetadataSideEffectsSkippingBuildingDirs is like applyMetadataSideEffects
// but suppresses directory notifications for dirs currently in buildingDirs.
// This prevents markDirectoryReadThrough from wiping entries mid-build.
func (mc *MetaCache) applyMetadataSideEffectsSkippingBuildingDirs(resp *filer_pb.SubscribeMetadataResponse, options MetadataResponseApplyOptions) {
sideEffects := metadataResponseSideEffects{}
if options.NotifyDirectories {
sideEffects.dirsToNotify = collectDirectoryNotifications(resp)
}
if options.InvalidateEntries {
sideEffects.invalidations = collectEntryInvalidations(resp)
}
for _, dirPath := range sideEffects.dirsToNotify {
if _, building := mc.buildingDirs[dirPath]; !building {
mc.noteDirectoryUpdate(dirPath)
}
}
for _, invalidation := range sideEffects.invalidations {
mc.invalidateFunc(invalidation.path, invalidation.entry)
}
}
func (mc *MetaCache) applyMetadataResponseLocked(ctx context.Context, resp *filer_pb.SubscribeMetadataResponse, _ MetadataResponseApplyOptions, allowUncachedInsert bool) (metadataResponseSideEffects, error) {
message := resp.GetEventNotification()
if message == nil {
return metadataResponseSideEffects{}, nil
}
var oldPath util.FullPath
var newEntry *filer.Entry
if message.OldEntry != nil {
oldPath = util.NewFullPath(resp.Directory, message.OldEntry.Name)
}
if message.NewEntry != nil {
dir := resp.Directory
if message.NewParentPath != "" {
dir = message.NewParentPath
}
newEntry = filer.FromPbEntry(dir, message.NewEntry)
}
mc.Lock()
err := mc.atomicUpdateEntryFromFilerLocked(ctx, oldPath, newEntry, allowUncachedInsert)
// When a directory is deleted or moved, remove its cached descendants
// so stale children cannot be served from the local cache.
if err == nil && oldPath != "" && message.OldEntry != nil && message.OldEntry.IsDirectory {
isDelete := message.NewEntry == nil
isMove := message.NewEntry != nil && (message.NewParentPath != resp.Directory || message.NewEntry.Name != message.OldEntry.Name)
if isDelete || isMove {
if deleteErr := mc.localStore.DeleteFolderChildren(ctx, oldPath); deleteErr != nil {
glog.V(2).Infof("delete descendants of %s: %v", oldPath, deleteErr)
}
}
}
mc.Unlock()
if err != nil {
return metadataResponseSideEffects{}, err
}
return metadataResponseSideEffects{}, nil
}
func (mc *MetaCache) beginDirectoryBuildNow(dirPath util.FullPath) error {
if _, found := mc.buildingDirs[dirPath]; found {
return nil
}
mc.buildingDirs[dirPath] = &directoryBuildState{}
return nil
}
func (mc *MetaCache) abortDirectoryBuildNow(dirPath util.FullPath) error {
delete(mc.buildingDirs, dirPath)
return nil
}
func (mc *MetaCache) completeDirectoryBuildNow(ctx context.Context, dirPath util.FullPath, snapshotTsNs int64) error {
state := mc.buildingDirs[dirPath]
delete(mc.buildingDirs, dirPath)
if state == nil {
return nil
}
for _, event := range state.bufferedEvents {
// When the server provided a snapshot timestamp, skip events that
// the listing already included. When snapshotTsNs == 0 (empty
// directory — server returned no entries and no snapshot), replay
// ALL buffered events to avoid dropping mutations due to
// client/server clock skew.
if snapshotTsNs != 0 && event.TsNs != 0 && event.TsNs <= snapshotTsNs {
continue
}
if err := mc.applyMetadataResponseDirect(ctx, event, MetadataResponseApplyOptions{}, true); err != nil {
return err
}
}
mc.markCachedFn(dirPath)
return nil
}
func (mc *MetaCache) routeMetadataResponse(resp *filer_pb.SubscribeMetadataResponse) ([]*filer_pb.SubscribeMetadataResponse, map[util.FullPath][]*filer_pb.SubscribeMetadataResponse) {
message := resp.GetEventNotification()
if message == nil {
return []*filer_pb.SubscribeMetadataResponse{resp}, nil
}
oldDir, hasOld := metadataOldParentDir(resp)
newDir, hasNew := metadataNewParentDir(resp)
oldBuilding := hasOld && mc.isBuildingDir(oldDir)
newBuilding := hasNew && mc.isBuildingDir(newDir)
if !oldBuilding && !newBuilding {
return []*filer_pb.SubscribeMetadataResponse{resp}, nil
}
bufferedEvents := make(map[util.FullPath][]*filer_pb.SubscribeMetadataResponse)
var immediateEvents []*filer_pb.SubscribeMetadataResponse
if hasOld && hasNew && oldDir != newDir {
deleteEvent := metadataDeleteFragment(resp)
createEvent := metadataCreateFragment(resp)
if oldBuilding {
bufferedEvents[oldDir] = append(bufferedEvents[oldDir], deleteEvent)
} else {
immediateEvents = append(immediateEvents, deleteEvent)
}
if newBuilding {
bufferedEvents[newDir] = append(bufferedEvents[newDir], createEvent)
} else {
immediateEvents = append(immediateEvents, createEvent)
}
return immediateEvents, bufferedEvents
}
targetDir := newDir
if hasOld {
targetDir = oldDir
}
if mc.isBuildingDir(targetDir) {
bufferedEvents[targetDir] = append(bufferedEvents[targetDir], resp)
return nil, bufferedEvents
}
return []*filer_pb.SubscribeMetadataResponse{resp}, nil
}
func (mc *MetaCache) isBuildingDir(dirPath util.FullPath) bool {
_, found := mc.buildingDirs[dirPath]
return found
}
func metadataOldParentDir(resp *filer_pb.SubscribeMetadataResponse) (util.FullPath, bool) {
if resp.GetEventNotification() == nil || resp.EventNotification.OldEntry == nil {
return "", false
}
return util.FullPath(resp.Directory), true
}
func metadataNewParentDir(resp *filer_pb.SubscribeMetadataResponse) (util.FullPath, bool) {
if resp.GetEventNotification() == nil || resp.EventNotification.NewEntry == nil {
return "", false
}
newDir := resp.Directory
if resp.EventNotification.NewParentPath != "" {
newDir = resp.EventNotification.NewParentPath
}
return util.FullPath(newDir), true
}
func metadataDeleteFragment(resp *filer_pb.SubscribeMetadataResponse) *filer_pb.SubscribeMetadataResponse {
if resp.GetEventNotification() == nil || resp.EventNotification.OldEntry == nil {
return nil
}
return &filer_pb.SubscribeMetadataResponse{
Directory: resp.Directory,
EventNotification: &filer_pb.EventNotification{
OldEntry: proto.Clone(resp.EventNotification.OldEntry).(*filer_pb.Entry),
},
TsNs: resp.TsNs,
}
}
func metadataCreateFragment(resp *filer_pb.SubscribeMetadataResponse) *filer_pb.SubscribeMetadataResponse {
if resp.GetEventNotification() == nil || resp.EventNotification.NewEntry == nil {
return nil
}
newDir := resp.Directory
if resp.EventNotification.NewParentPath != "" {
newDir = resp.EventNotification.NewParentPath
}
return &filer_pb.SubscribeMetadataResponse{
Directory: newDir,
EventNotification: &filer_pb.EventNotification{
NewEntry: proto.Clone(resp.EventNotification.NewEntry).(*filer_pb.Entry),
NewParentPath: newDir,
},
TsNs: resp.TsNs,
}
}
func metadataEventDedupKey(resp *filer_pb.SubscribeMetadataResponse) string {
var oldName, newName, newParent string
hasOld, hasNew := false, false
if msg := resp.GetEventNotification(); msg != nil {
if msg.OldEntry != nil {
oldName = msg.OldEntry.Name
hasOld = true
}
if msg.NewEntry != nil {
newName = msg.NewEntry.Name
hasNew = true
newParent = msg.NewParentPath
}
}
// Encode event shape (create/delete/update/rename) so structurally
// different events with the same names are not collapsed.
var shape byte
switch {
case hasOld && hasNew:
if resp.Directory != newParent && newParent != "" {
shape = 'R' // rename across directories
} else {
shape = 'U' // update in place
}
case hasOld:
shape = 'D' // delete
case hasNew:
shape = 'C' // create
}
return fmt.Sprintf("%d|%c|%s|%s|%s|%s", resp.TsNs, shape, resp.Directory, oldName, newParent, newName)
}
func (mc *MetaCache) shouldSkipDuplicateEvent(resp *filer_pb.SubscribeMetadataResponse) bool {
if resp == nil || resp.TsNs == 0 {
return false
}
key := metadataEventDedupKey(resp)
return !mc.dedupRing.Add(key)
}
type dedupRingBuffer struct {
keys [recentEventDedupWindow]string
head int
size int
set map[string]struct{}
}
func newDedupRingBuffer() dedupRingBuffer {
return dedupRingBuffer{
set: make(map[string]struct{}, recentEventDedupWindow),
}
}
func (r *dedupRingBuffer) Add(key string) bool {
if _, found := r.set[key]; found {
return false // duplicate
}
if r.size == recentEventDedupWindow {
evicted := r.keys[r.head]
delete(r.set, evicted)
} else {
r.size++
}
r.keys[r.head] = key
r.set[key] = struct{}{}
r.head = (r.head + 1) % recentEventDedupWindow
return true // new entry
}
func collectDirectoryNotifications(resp *filer_pb.SubscribeMetadataResponse) []util.FullPath {
message := resp.GetEventNotification()
if message == nil {
return nil
}
// At most 3 dirs: old parent, new parent, new child (if directory).
// Use a fixed slice with linear dedup to avoid map allocation.
var dirs [3]util.FullPath
n := 0
addUnique := func(p util.FullPath) {
for i := 0; i < n; i++ {
if dirs[i] == p {
return
}
}
dirs[n] = p
n++
}
if message.OldEntry != nil {
oldPath := util.NewFullPath(resp.Directory, message.OldEntry.Name)
parent, _ := oldPath.DirAndName()
addUnique(util.FullPath(parent))
}
if message.NewEntry != nil {
newDir := resp.Directory
if message.NewParentPath != "" {
newDir = message.NewParentPath
}
newPath := util.NewFullPath(newDir, message.NewEntry.Name)
parent, _ := newPath.DirAndName()
addUnique(util.FullPath(parent))
if message.NewEntry.IsDirectory {
addUnique(newPath)
}
}
return dirs[:n]
}
func collectEntryInvalidations(resp *filer_pb.SubscribeMetadataResponse) []metadataInvalidation {
message := resp.GetEventNotification()
if message == nil {
return nil
}
var invalidations []metadataInvalidation
if message.OldEntry != nil && message.NewEntry != nil {
oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
invalidations = append(invalidations, metadataInvalidation{path: oldKey, entry: message.OldEntry})
// Normalize NewParentPath: empty means same directory as resp.Directory
newDir := resp.Directory
if message.NewParentPath != "" {
newDir = message.NewParentPath
}
if message.OldEntry.Name != message.NewEntry.Name || resp.Directory != newDir {
newKey := util.NewFullPath(newDir, message.NewEntry.Name)
invalidations = append(invalidations, metadataInvalidation{path: newKey, entry: message.NewEntry})
}
return invalidations
}
if filer_pb.IsCreate(resp) && message.NewEntry != nil {
newDir := resp.Directory
if message.NewParentPath != "" {
newDir = message.NewParentPath
}
newKey := util.NewFullPath(newDir, message.NewEntry.Name)
invalidations = append(invalidations, metadataInvalidation{path: newKey, entry: message.NewEntry})
}
if filer_pb.IsDelete(resp) && message.OldEntry != nil {
oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
invalidations = append(invalidations, metadataInvalidation{path: oldKey, entry: message.OldEntry})
}
return invalidations
}