From 374196178a06fe630738110d9d9c90bec1189455 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 26 Nov 2025 09:54:23 -0800 Subject: [PATCH] filer: add ConcurrentFileUploadLimit option to limit number of concurrent uploads This adds a new configuration option ConcurrentFileUploadLimit that limits the number of concurrent file uploads based on file count, complementing the existing ConcurrentUploadLimit which limits based on total data size. This addresses an OOM vulnerability where requests with missing/zero Content-Length headers could bypass the size-based rate limiter. Changes: - Add ConcurrentFileUploadLimit field to FilerOption - Add inFlightUploads counter to FilerServer - Update upload handler to check both size and count limits - Add -concurrentFileUploadLimit command line flag (default: 0 = unlimited) Fixes #7529 --- weed/command/filer.go | 105 ++++++++++++++------------- weed/server/filer_server.go | 46 ++++++------ weed/server/filer_server_handlers.go | 18 ++++- 3 files changed, 94 insertions(+), 75 deletions(-) diff --git a/weed/command/filer.go b/weed/command/filer.go index 053c5a147..f26ff9134 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -42,38 +42,39 @@ var ( ) type FilerOptions struct { - masters *pb.ServerDiscovery - mastersString *string - ip *string - bindIp *string - port *int - portGrpc *int - publicPort *int - filerGroup *string - collection *string - defaultReplicaPlacement *string - disableDirListing *bool - maxMB *int - dirListingLimit *int - dataCenter *string - rack *string - enableNotification *bool - disableHttp *bool - cipher *bool - metricsHttpPort *int - metricsHttpIp *string - saveToFilerLimit *int - defaultLevelDbDirectory *string - concurrentUploadLimitMB *int - debug *bool - debugPort *int - localSocket *string - showUIDirectoryDelete *bool - downloadMaxMBps *int - diskType *string - allowedOrigins *string - exposeDirectoryData *bool - certProvider certprovider.Provider + masters *pb.ServerDiscovery + mastersString *string + ip *string + bindIp *string + port *int + portGrpc *int + publicPort *int + filerGroup *string + collection *string + defaultReplicaPlacement *string + disableDirListing *bool + maxMB *int + dirListingLimit *int + dataCenter *string + rack *string + enableNotification *bool + disableHttp *bool + cipher *bool + metricsHttpPort *int + metricsHttpIp *string + saveToFilerLimit *int + defaultLevelDbDirectory *string + concurrentUploadLimitMB *int + concurrentFileUploadLimit *int + debug *bool + debugPort *int + localSocket *string + showUIDirectoryDelete *bool + downloadMaxMBps *int + diskType *string + allowedOrigins *string + exposeDirectoryData *bool + certProvider certprovider.Provider } func init() { @@ -99,6 +100,7 @@ func init() { f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store") f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory") f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") + f.concurrentFileUploadLimit = cmdFiler.Flag.Int("concurrentFileUploadLimit", 0, "limit number of concurrent file uploads, 0 means unlimited") f.debug = cmdFiler.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:/debug/pprof/goroutine?debug=2") f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging") f.localSocket = cmdFiler.Flag.String("localSocket", "", "default to /tmp/seaweedfs-filer-.sock") @@ -310,25 +312,26 @@ func (fo *FilerOptions) startFiler() { filerAddress := pb.NewServerAddress(*fo.ip, *fo.port, *fo.portGrpc) fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{ - Masters: fo.masters, - FilerGroup: *fo.filerGroup, - Collection: *fo.collection, - DefaultReplication: *fo.defaultReplicaPlacement, - DisableDirListing: *fo.disableDirListing, - MaxMB: *fo.maxMB, - DirListingLimit: *fo.dirListingLimit, - DataCenter: *fo.dataCenter, - Rack: *fo.rack, - DefaultLevelDbDir: defaultLevelDbDirectory, - DisableHttp: *fo.disableHttp, - Host: filerAddress, - Cipher: *fo.cipher, - SaveToFilerLimit: int64(*fo.saveToFilerLimit), - ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024, - ShowUIDirectoryDelete: *fo.showUIDirectoryDelete, - DownloadMaxBytesPs: int64(*fo.downloadMaxMBps) * 1024 * 1024, - DiskType: *fo.diskType, - AllowedOrigins: strings.Split(*fo.allowedOrigins, ","), + Masters: fo.masters, + FilerGroup: *fo.filerGroup, + Collection: *fo.collection, + DefaultReplication: *fo.defaultReplicaPlacement, + DisableDirListing: *fo.disableDirListing, + MaxMB: *fo.maxMB, + DirListingLimit: *fo.dirListingLimit, + DataCenter: *fo.dataCenter, + Rack: *fo.rack, + DefaultLevelDbDir: defaultLevelDbDirectory, + DisableHttp: *fo.disableHttp, + Host: filerAddress, + Cipher: *fo.cipher, + SaveToFilerLimit: int64(*fo.saveToFilerLimit), + ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024, + ConcurrentFileUploadLimit: int64(*fo.concurrentFileUploadLimit), + ShowUIDirectoryDelete: *fo.showUIDirectoryDelete, + DownloadMaxBytesPs: int64(*fo.downloadMaxMBps) * 1024 * 1024, + DiskType: *fo.diskType, + AllowedOrigins: strings.Split(*fo.allowedOrigins, ","), }) if nfs_err != nil { glog.Fatalf("Filer startup error: %v", nfs_err) diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 3d08c0980..95d344af4 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -56,32 +56,34 @@ import ( ) type FilerOption struct { - Masters *pb.ServerDiscovery - FilerGroup string - Collection string - DefaultReplication string - DisableDirListing bool - MaxMB int - DirListingLimit int - DataCenter string - Rack string - DataNode string - DefaultLevelDbDir string - DisableHttp bool - Host pb.ServerAddress - recursiveDelete bool - Cipher bool - SaveToFilerLimit int64 - ConcurrentUploadLimit int64 - ShowUIDirectoryDelete bool - DownloadMaxBytesPs int64 - DiskType string - AllowedOrigins []string - ExposeDirectoryData bool + Masters *pb.ServerDiscovery + FilerGroup string + Collection string + DefaultReplication string + DisableDirListing bool + MaxMB int + DirListingLimit int + DataCenter string + Rack string + DataNode string + DefaultLevelDbDir string + DisableHttp bool + Host pb.ServerAddress + recursiveDelete bool + Cipher bool + SaveToFilerLimit int64 + ConcurrentUploadLimit int64 + ConcurrentFileUploadLimit int64 + ShowUIDirectoryDelete bool + DownloadMaxBytesPs int64 + DiskType string + AllowedOrigins []string + ExposeDirectoryData bool } type FilerServer struct { inFlightDataSize int64 + inFlightUploads int64 listenersWaits int64 // notifying clients diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index dcfc8e3ed..a2eab9365 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -95,14 +95,28 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { contentLength := getContentLength(r) fs.inFlightDataLimitCond.L.Lock() inFlightDataSize := atomic.LoadInt64(&fs.inFlightDataSize) - for fs.option.ConcurrentUploadLimit != 0 && inFlightDataSize > fs.option.ConcurrentUploadLimit { - glog.V(4).Infof("wait because inflight data %d > %d", inFlightDataSize, fs.option.ConcurrentUploadLimit) + inFlightUploads := atomic.LoadInt64(&fs.inFlightUploads) + + // Wait if either data size limit or file count limit is exceeded + for (fs.option.ConcurrentUploadLimit != 0 && inFlightDataSize > fs.option.ConcurrentUploadLimit) || (fs.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= fs.option.ConcurrentFileUploadLimit) { + if (fs.option.ConcurrentUploadLimit != 0 && inFlightDataSize > fs.option.ConcurrentUploadLimit) { + glog.V(4).Infof("wait because inflight data %d > %d", inFlightDataSize, fs.option.ConcurrentUploadLimit) + } + if (fs.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= fs.option.ConcurrentFileUploadLimit) { + glog.V(4).Infof("wait because inflight uploads %d >= %d", inFlightUploads, fs.option.ConcurrentFileUploadLimit) + } fs.inFlightDataLimitCond.Wait() inFlightDataSize = atomic.LoadInt64(&fs.inFlightDataSize) + inFlightUploads = atomic.LoadInt64(&fs.inFlightUploads) } fs.inFlightDataLimitCond.L.Unlock() + + // Increment counters + atomic.AddInt64(&fs.inFlightUploads, 1) atomic.AddInt64(&fs.inFlightDataSize, contentLength) defer func() { + // Decrement counters + atomic.AddInt64(&fs.inFlightUploads, -1) atomic.AddInt64(&fs.inFlightDataSize, -contentLength) fs.inFlightDataLimitCond.Signal() }()