Browse Source

filer: add ConcurrentFileUploadLimit option to limit number of concurrent uploads

This adds a new configuration option ConcurrentFileUploadLimit that limits
the number of concurrent file uploads based on file count, complementing
the existing ConcurrentUploadLimit which limits based on total data size.

This addresses an OOM vulnerability where requests with missing/zero
Content-Length headers could bypass the size-based rate limiter.

Changes:
- Add ConcurrentFileUploadLimit field to FilerOption
- Add inFlightUploads counter to FilerServer
- Update upload handler to check both size and count limits
- Add -concurrentFileUploadLimit command line flag (default: 0 = unlimited)

Fixes #7529
pull/7554/head
Chris Lu 1 week ago
parent
commit
374196178a
  1. 3
      weed/command/filer.go
  2. 2
      weed/server/filer_server.go
  3. 16
      weed/server/filer_server_handlers.go

3
weed/command/filer.go

@ -65,6 +65,7 @@ type FilerOptions struct {
saveToFilerLimit *int
defaultLevelDbDirectory *string
concurrentUploadLimitMB *int
concurrentFileUploadLimit *int
debug *bool
debugPort *int
localSocket *string
@ -99,6 +100,7 @@ func init() {
f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store")
f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory")
f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size")
f.concurrentFileUploadLimit = cmdFiler.Flag.Int("concurrentFileUploadLimit", 0, "limit number of concurrent file uploads, 0 means unlimited")
f.debug = cmdFiler.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2")
f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging")
f.localSocket = cmdFiler.Flag.String("localSocket", "", "default to /tmp/seaweedfs-filer-<port>.sock")
@ -325,6 +327,7 @@ func (fo *FilerOptions) startFiler() {
Cipher: *fo.cipher,
SaveToFilerLimit: int64(*fo.saveToFilerLimit),
ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024,
ConcurrentFileUploadLimit: int64(*fo.concurrentFileUploadLimit),
ShowUIDirectoryDelete: *fo.showUIDirectoryDelete,
DownloadMaxBytesPs: int64(*fo.downloadMaxMBps) * 1024 * 1024,
DiskType: *fo.diskType,

2
weed/server/filer_server.go

@ -73,6 +73,7 @@ type FilerOption struct {
Cipher bool
SaveToFilerLimit int64
ConcurrentUploadLimit int64
ConcurrentFileUploadLimit int64
ShowUIDirectoryDelete bool
DownloadMaxBytesPs int64
DiskType string
@ -82,6 +83,7 @@ type FilerOption struct {
type FilerServer struct {
inFlightDataSize int64
inFlightUploads int64
listenersWaits int64
// notifying clients

16
weed/server/filer_server_handlers.go

@ -95,14 +95,28 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
contentLength := getContentLength(r)
fs.inFlightDataLimitCond.L.Lock()
inFlightDataSize := atomic.LoadInt64(&fs.inFlightDataSize)
for fs.option.ConcurrentUploadLimit != 0 && inFlightDataSize > fs.option.ConcurrentUploadLimit {
inFlightUploads := atomic.LoadInt64(&fs.inFlightUploads)
// Wait if either data size limit or file count limit is exceeded
for (fs.option.ConcurrentUploadLimit != 0 && inFlightDataSize > fs.option.ConcurrentUploadLimit) || (fs.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= fs.option.ConcurrentFileUploadLimit) {
if (fs.option.ConcurrentUploadLimit != 0 && inFlightDataSize > fs.option.ConcurrentUploadLimit) {
glog.V(4).Infof("wait because inflight data %d > %d", inFlightDataSize, fs.option.ConcurrentUploadLimit)
}
if (fs.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= fs.option.ConcurrentFileUploadLimit) {
glog.V(4).Infof("wait because inflight uploads %d >= %d", inFlightUploads, fs.option.ConcurrentFileUploadLimit)
}
fs.inFlightDataLimitCond.Wait()
inFlightDataSize = atomic.LoadInt64(&fs.inFlightDataSize)
inFlightUploads = atomic.LoadInt64(&fs.inFlightUploads)
}
fs.inFlightDataLimitCond.L.Unlock()
// Increment counters
atomic.AddInt64(&fs.inFlightUploads, 1)
atomic.AddInt64(&fs.inFlightDataSize, contentLength)
defer func() {
// Decrement counters
atomic.AddInt64(&fs.inFlightUploads, -1)
atomic.AddInt64(&fs.inFlightDataSize, -contentLength)
fs.inFlightDataLimitCond.Signal()
}()

Loading…
Cancel
Save