From ec3378f7a6245c8184afdfc1f070bbc76004033e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 17 Dec 2025 01:14:01 -0800 Subject: [PATCH] fix: improve mount quota enforcement to prevent overflow (#7804) * fix: improve mount quota enforcement to prevent overflow (fixes seaweedfs-csi-driver#218) * test: add unit tests for quota enforcement --- weed/mount/weedfs_attr.go | 3 +- weed/mount/weedfs_dir_mkrm.go | 2 +- weed/mount/weedfs_file_mkrm.go | 2 +- weed/mount/weedfs_file_sync.go | 6 +- weed/mount/weedfs_file_write.go | 14 +- weed/mount/weedfs_link.go | 2 +- weed/mount/weedfs_quota.go | 158 ++++++++++++++++++----- weed/mount/weedfs_quota_test.go | 218 ++++++++++++++++++++++++++++++++ weed/mount/weedfs_rename.go | 2 +- weed/mount/weedfs_symlink.go | 2 +- weed/mount/weedfs_xattr.go | 2 +- 11 files changed, 369 insertions(+), 42 deletions(-) create mode 100644 weed/mount/weedfs_quota_test.go diff --git a/weed/mount/weedfs_attr.go b/weed/mount/weedfs_attr.go index d8ca4bc6a..b79b0629a 100644 --- a/weed/mount/weedfs_attr.go +++ b/weed/mount/weedfs_attr.go @@ -42,7 +42,8 @@ func (wfs *WFS) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse.AttrOut) (code fuse.Status) { - if wfs.IsOverQuota { + // Check quota including uncommitted writes for real-time enforcement + if wfs.IsOverQuotaWithUncommitted() { return fuse.Status(syscall.ENOSPC) } diff --git a/weed/mount/weedfs_dir_mkrm.go b/weed/mount/weedfs_dir_mkrm.go index 503d6a076..367270bee 100644 --- a/weed/mount/weedfs_dir_mkrm.go +++ b/weed/mount/weedfs_dir_mkrm.go @@ -23,7 +23,7 @@ import ( * */ func (wfs *WFS) Mkdir(cancel <-chan struct{}, in *fuse.MkdirIn, name string, out *fuse.EntryOut) (code fuse.Status) { - if wfs.IsOverQuota { + if wfs.IsOverQuotaWithUncommitted() { return fuse.Status(syscall.ENOSPC) } diff --git a/weed/mount/weedfs_file_mkrm.go b/weed/mount/weedfs_file_mkrm.go index e48d5af40..09e50b488 100644 --- a/weed/mount/weedfs_file_mkrm.go +++ b/weed/mount/weedfs_file_mkrm.go @@ -37,7 +37,7 @@ func (wfs *WFS) Create(cancel <-chan struct{}, in *fuse.CreateIn, name string, o */ func (wfs *WFS) Mknod(cancel <-chan struct{}, in *fuse.MknodIn, name string, out *fuse.EntryOut) (code fuse.Status) { - if wfs.IsOverQuota { + if wfs.IsOverQuotaWithUncommitted() { return fuse.Status(syscall.ENOSPC) } diff --git a/weed/mount/weedfs_file_sync.go b/weed/mount/weedfs_file_sync.go index eda5ad8da..f86f839ca 100644 --- a/weed/mount/weedfs_file_sync.go +++ b/weed/mount/weedfs_file_sync.go @@ -100,7 +100,9 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status { // send the data to the OS glog.V(4).Infof("doFlush %s fh %d", fileFullPath, fh.fh) - if !wfs.IsOverQuota { + // Check quota including uncommitted writes for real-time enforcement + isOverQuota := wfs.IsOverQuotaWithUncommitted() + if !isOverQuota { if err := fh.dirtyPages.FlushData(); err != nil { glog.Errorf("%v doFlush: %v", fileFullPath, err) return fuse.EIO @@ -111,7 +113,7 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status { return fuse.OK } - if wfs.IsOverQuota { + if isOverQuota { return fuse.Status(syscall.ENOSPC) } diff --git a/weed/mount/weedfs_file_write.go b/weed/mount/weedfs_file_write.go index 1ec20c294..00511c5cd 100644 --- a/weed/mount/weedfs_file_write.go +++ b/weed/mount/weedfs_file_write.go @@ -36,7 +36,8 @@ import ( */ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (written uint32, code fuse.Status) { - if wfs.IsOverQuota { + // Check quota including uncommitted writes for real-time enforcement + if wfs.IsOverQuotaWithUncommitted() { return 0, fuse.Status(syscall.ENOSPC) } @@ -59,7 +60,16 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr entry.Content = nil offset := int64(in.Offset) - entry.Attributes.FileSize = uint64(max(offset+int64(len(data)), int64(entry.Attributes.FileSize))) + oldFileSize := int64(entry.Attributes.FileSize) + newFileSize := max(offset+int64(len(data)), oldFileSize) + entry.Attributes.FileSize = uint64(newFileSize) + + // Track uncommitted bytes for real-time quota enforcement. + // Only count the new bytes being added beyond the current file size. + if newFileSize > oldFileSize { + wfs.AddUncommittedBytes(newFileSize - oldFileSize) + } + // glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data)) fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsSequentialMode(), tsNs) diff --git a/weed/mount/weedfs_link.go b/weed/mount/weedfs_link.go index 344b907b4..95e93e1f1 100644 --- a/weed/mount/weedfs_link.go +++ b/weed/mount/weedfs_link.go @@ -25,7 +25,7 @@ When creating a link: /** Create a hard link to a file */ func (wfs *WFS) Link(cancel <-chan struct{}, in *fuse.LinkIn, name string, out *fuse.EntryOut) (code fuse.Status) { - if wfs.IsOverQuota { + if wfs.IsOverQuotaWithUncommitted() { return fuse.Status(syscall.ENOSPC) } diff --git a/weed/mount/weedfs_quota.go b/weed/mount/weedfs_quota.go index 23f487549..df35fa61e 100644 --- a/weed/mount/weedfs_quota.go +++ b/weed/mount/weedfs_quota.go @@ -3,52 +3,148 @@ package mount import ( "context" "fmt" + "sync/atomic" + "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" - "time" ) +const ( + // Default quota check interval + defaultQuotaCheckInterval = 61 * time.Second + // Faster check interval when approaching quota (within 10%) + fastQuotaCheckInterval = 5 * time.Second + // Threshold for switching to fast check (90% of quota) + quotaWarningThreshold = 0.9 +) + +// uncommittedBytes tracks bytes written locally but not yet reflected in filer statistics. +// This is used for real-time quota enforcement between periodic checks. +var uncommittedBytes int64 + +// AddUncommittedBytes adds bytes to the uncommitted write counter. +// Called when data is written to the mount. +func (wfs *WFS) AddUncommittedBytes(bytes int64) { + if wfs.option.Quota > 0 { + atomic.AddInt64(&uncommittedBytes, bytes) + } +} + +// SubtractUncommittedBytes subtracts bytes from the uncommitted counter. +// Called when data is flushed to filer or on quota refresh. +func (wfs *WFS) SubtractUncommittedBytes(bytes int64) { + if wfs.option.Quota > 0 { + current := atomic.AddInt64(&uncommittedBytes, -bytes) + // Don't let it go negative + if current < 0 { + atomic.StoreInt64(&uncommittedBytes, 0) + } + } +} + +// ResetUncommittedBytes resets the counter after a quota check syncs with filer. +func (wfs *WFS) ResetUncommittedBytes() { + atomic.StoreInt64(&uncommittedBytes, 0) +} + +// GetUncommittedBytes returns the current uncommitted byte count. +func (wfs *WFS) GetUncommittedBytes() int64 { + return atomic.LoadInt64(&uncommittedBytes) +} + +// IsOverQuotaWithUncommitted checks if quota is exceeded including uncommitted writes. +// This provides real-time quota enforcement between periodic checks. +func (wfs *WFS) IsOverQuotaWithUncommitted() bool { + if wfs.option.Quota <= 0 { + return false + } + if wfs.IsOverQuota { + return true + } + // Check if uncommitted writes would exceed quota + uncommitted := atomic.LoadInt64(&uncommittedBytes) + usedSize := int64(wfs.stats.UsedSize) + return (usedSize + uncommitted) > wfs.option.Quota +} + func (wfs *WFS) loopCheckQuota() { - for { + // Skip quota checking if no quota is set + if wfs.option.Quota <= 0 { + return + } + + // Check quota immediately on mount, don't wait for first interval + wfs.checkQuotaOnce() - time.Sleep(61 * time.Second) + for { + // Adaptive interval: check more frequently when approaching quota + interval := wfs.getQuotaCheckInterval() + time.Sleep(interval) if wfs.option.Quota <= 0 { continue } - err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.StatisticsRequest{ - Collection: wfs.option.Collection, - Replication: wfs.option.Replication, - Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec), - DiskType: string(wfs.option.DiskType), - } - - resp, err := client.Statistics(context.Background(), request) - if err != nil { - glog.V(0).Infof("reading quota usage %v: %v", request, err) - return err - } - glog.V(4).Infof("read quota usage: %+v", resp) - - isOverQuota := int64(resp.UsedSize) > wfs.option.Quota - if isOverQuota && !wfs.IsOverQuota { - glog.Warningf("Quota Exceeded! quota:%d used:%d", wfs.option.Quota, resp.UsedSize) - } else if !isOverQuota && wfs.IsOverQuota { - glog.Warningf("Within quota limit! quota:%d used:%d", wfs.option.Quota, resp.UsedSize) - } - wfs.IsOverQuota = isOverQuota - - return nil - }) + wfs.checkQuotaOnce() + } +} +// getQuotaCheckInterval returns the check interval based on current usage. +// Returns a shorter interval when approaching quota limit. +func (wfs *WFS) getQuotaCheckInterval() time.Duration { + if wfs.option.Quota <= 0 { + return defaultQuotaCheckInterval + } + + usedSize := int64(wfs.stats.UsedSize) + uncommitted := atomic.LoadInt64(&uncommittedBytes) + totalUsed := usedSize + uncommitted + + // If we're at 90% or more of quota, check more frequently + if float64(totalUsed) >= float64(wfs.option.Quota)*quotaWarningThreshold { + return fastQuotaCheckInterval + } + return defaultQuotaCheckInterval +} + +func (wfs *WFS) checkQuotaOnce() { + err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.StatisticsRequest{ + Collection: wfs.option.Collection, + Replication: wfs.option.Replication, + Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec), + DiskType: string(wfs.option.DiskType), + } + + resp, err := client.Statistics(context.Background(), request) if err != nil { - glog.Warningf("read quota usage: %v", err) + glog.V(0).Infof("reading quota usage %v: %v", request, err) + return err } + glog.V(4).Infof("read quota usage: %+v", resp) - } + // Update the stats cache with latest filer data + wfs.stats.UsedSize = resp.UsedSize + wfs.stats.TotalSize = resp.TotalSize + // Reset uncommitted counter since we now have fresh data from filer + wfs.ResetUncommittedBytes() + + isOverQuota := int64(resp.UsedSize) > wfs.option.Quota + if isOverQuota && !wfs.IsOverQuota { + glog.Warningf("Quota Exceeded! quota:%d used:%d", wfs.option.Quota, resp.UsedSize) + } else if !isOverQuota && wfs.IsOverQuota { + glog.Warningf("Within quota limit! quota:%d used:%d", wfs.option.Quota, resp.UsedSize) + } + wfs.IsOverQuota = isOverQuota + + return nil + }) + + if err != nil { + glog.Warningf("read quota usage: %v", err) + } } diff --git a/weed/mount/weedfs_quota_test.go b/weed/mount/weedfs_quota_test.go new file mode 100644 index 000000000..47ae30341 --- /dev/null +++ b/weed/mount/weedfs_quota_test.go @@ -0,0 +1,218 @@ +package mount + +import ( + "sync/atomic" + "testing" +) + +func TestUncommittedBytesTracking(t *testing.T) { + // Reset the global counter + atomic.StoreInt64(&uncommittedBytes, 0) + + wfs := &WFS{ + option: &Option{ + Quota: 100 * 1024 * 1024, // 100MB + }, + } + + // Test AddUncommittedBytes + wfs.AddUncommittedBytes(1024) + if got := wfs.GetUncommittedBytes(); got != 1024 { + t.Errorf("AddUncommittedBytes: got %d, want 1024", got) + } + + // Test accumulation + wfs.AddUncommittedBytes(2048) + if got := wfs.GetUncommittedBytes(); got != 3072 { + t.Errorf("AddUncommittedBytes accumulation: got %d, want 3072", got) + } + + // Test SubtractUncommittedBytes + wfs.SubtractUncommittedBytes(1024) + if got := wfs.GetUncommittedBytes(); got != 2048 { + t.Errorf("SubtractUncommittedBytes: got %d, want 2048", got) + } + + // Test ResetUncommittedBytes + wfs.ResetUncommittedBytes() + if got := wfs.GetUncommittedBytes(); got != 0 { + t.Errorf("ResetUncommittedBytes: got %d, want 0", got) + } +} + +func TestUncommittedBytesDoesNotGoNegative(t *testing.T) { + atomic.StoreInt64(&uncommittedBytes, 0) + + wfs := &WFS{ + option: &Option{ + Quota: 100 * 1024 * 1024, + }, + } + + wfs.AddUncommittedBytes(100) + wfs.SubtractUncommittedBytes(200) // Try to subtract more than available + + if got := wfs.GetUncommittedBytes(); got < 0 { + t.Errorf("uncommittedBytes went negative: got %d", got) + } +} + +func TestIsOverQuotaWithUncommitted(t *testing.T) { + atomic.StoreInt64(&uncommittedBytes, 0) + + tests := []struct { + name string + quota int64 + usedSize uint64 + uncommitted int64 + isOverQuota bool + want bool + }{ + { + name: "no quota set", + quota: 0, + usedSize: 1000, + uncommitted: 1000, + isOverQuota: false, + want: false, + }, + { + name: "under quota", + quota: 1000, + usedSize: 400, + uncommitted: 400, + isOverQuota: false, + want: false, + }, + { + name: "over quota with uncommitted", + quota: 1000, + usedSize: 600, + uncommitted: 500, + isOverQuota: false, + want: true, + }, + { + name: "already over quota flag set", + quota: 1000, + usedSize: 500, + uncommitted: 0, + isOverQuota: true, + want: true, + }, + { + name: "exactly at quota", + quota: 1000, + usedSize: 500, + uncommitted: 500, + isOverQuota: false, + want: false, + }, + { + name: "one byte over quota", + quota: 1000, + usedSize: 500, + uncommitted: 501, + isOverQuota: false, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + atomic.StoreInt64(&uncommittedBytes, tt.uncommitted) + + wfs := &WFS{ + option: &Option{ + Quota: tt.quota, + }, + IsOverQuota: tt.isOverQuota, + } + wfs.stats.UsedSize = tt.usedSize + + got := wfs.IsOverQuotaWithUncommitted() + if got != tt.want { + t.Errorf("IsOverQuotaWithUncommitted() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestGetQuotaCheckInterval(t *testing.T) { + atomic.StoreInt64(&uncommittedBytes, 0) + + tests := []struct { + name string + quota int64 + usedSize uint64 + uncommitted int64 + wantFast bool + }{ + { + name: "no quota", + quota: 0, + usedSize: 0, + uncommitted: 0, + wantFast: false, + }, + { + name: "under 90% threshold", + quota: 1000, + usedSize: 800, + uncommitted: 0, + wantFast: false, + }, + { + name: "at 90% threshold", + quota: 1000, + usedSize: 900, + uncommitted: 0, + wantFast: true, + }, + { + name: "over 90% with uncommitted", + quota: 1000, + usedSize: 500, + uncommitted: 410, + wantFast: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + atomic.StoreInt64(&uncommittedBytes, tt.uncommitted) + + wfs := &WFS{ + option: &Option{ + Quota: tt.quota, + }, + } + wfs.stats.UsedSize = tt.usedSize + + got := wfs.getQuotaCheckInterval() + if tt.wantFast && got != fastQuotaCheckInterval { + t.Errorf("getQuotaCheckInterval() = %v, want fast interval %v", got, fastQuotaCheckInterval) + } + if !tt.wantFast && got != defaultQuotaCheckInterval { + t.Errorf("getQuotaCheckInterval() = %v, want default interval %v", got, defaultQuotaCheckInterval) + } + }) + } +} + +func TestNoQuotaTrackingWhenDisabled(t *testing.T) { + atomic.StoreInt64(&uncommittedBytes, 0) + + wfs := &WFS{ + option: &Option{ + Quota: 0, // No quota + }, + } + + // Should not track when quota is disabled + wfs.AddUncommittedBytes(1000) + if got := wfs.GetUncommittedBytes(); got != 0 { + t.Errorf("Should not track uncommitted bytes when quota disabled: got %d", got) + } +} + diff --git a/weed/mount/weedfs_rename.go b/weed/mount/weedfs_rename.go index e567b12e1..b52e312d4 100644 --- a/weed/mount/weedfs_rename.go +++ b/weed/mount/weedfs_rename.go @@ -132,7 +132,7 @@ const ( ) func (wfs *WFS) Rename(cancel <-chan struct{}, in *fuse.RenameIn, oldName string, newName string) (code fuse.Status) { - if wfs.IsOverQuota { + if wfs.IsOverQuotaWithUncommitted() { return fuse.Status(syscall.ENOSPC) } diff --git a/weed/mount/weedfs_symlink.go b/weed/mount/weedfs_symlink.go index 80081c31c..a0e2195d6 100644 --- a/weed/mount/weedfs_symlink.go +++ b/weed/mount/weedfs_symlink.go @@ -17,7 +17,7 @@ import ( /** Create a symbolic link */ func (wfs *WFS) Symlink(cancel <-chan struct{}, header *fuse.InHeader, target string, name string, out *fuse.EntryOut) (code fuse.Status) { - if wfs.IsOverQuota { + if wfs.IsOverQuotaWithUncommitted() { return fuse.Status(syscall.ENOSPC) } if s := checkName(name); s != fuse.OK { diff --git a/weed/mount/weedfs_xattr.go b/weed/mount/weedfs_xattr.go index 78acaafc8..6360164a5 100644 --- a/weed/mount/weedfs_xattr.go +++ b/weed/mount/weedfs_xattr.go @@ -83,7 +83,7 @@ func (wfs *WFS) SetXAttr(cancel <-chan struct{}, input *fuse.SetXAttrIn, attr st return fuse.Status(syscall.ENOTSUP) } - if wfs.IsOverQuota { + if wfs.IsOverQuotaWithUncommitted() { return fuse.Status(syscall.ENOSPC) }