diff --git a/weed/command/filer.go b/weed/command/filer.go index 053c5a147..f26ff9134 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -42,38 +42,39 @@ var ( ) type FilerOptions struct { - masters *pb.ServerDiscovery - mastersString *string - ip *string - bindIp *string - port *int - portGrpc *int - publicPort *int - filerGroup *string - collection *string - defaultReplicaPlacement *string - disableDirListing *bool - maxMB *int - dirListingLimit *int - dataCenter *string - rack *string - enableNotification *bool - disableHttp *bool - cipher *bool - metricsHttpPort *int - metricsHttpIp *string - saveToFilerLimit *int - defaultLevelDbDirectory *string - concurrentUploadLimitMB *int - debug *bool - debugPort *int - localSocket *string - showUIDirectoryDelete *bool - downloadMaxMBps *int - diskType *string - allowedOrigins *string - exposeDirectoryData *bool - certProvider certprovider.Provider + masters *pb.ServerDiscovery + mastersString *string + ip *string + bindIp *string + port *int + portGrpc *int + publicPort *int + filerGroup *string + collection *string + defaultReplicaPlacement *string + disableDirListing *bool + maxMB *int + dirListingLimit *int + dataCenter *string + rack *string + enableNotification *bool + disableHttp *bool + cipher *bool + metricsHttpPort *int + metricsHttpIp *string + saveToFilerLimit *int + defaultLevelDbDirectory *string + concurrentUploadLimitMB *int + concurrentFileUploadLimit *int + debug *bool + debugPort *int + localSocket *string + showUIDirectoryDelete *bool + downloadMaxMBps *int + diskType *string + allowedOrigins *string + exposeDirectoryData *bool + certProvider certprovider.Provider } func init() { @@ -99,6 +100,7 @@ func init() { f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store") f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory") f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") + f.concurrentFileUploadLimit = cmdFiler.Flag.Int("concurrentFileUploadLimit", 0, "limit number of concurrent file uploads, 0 means unlimited") f.debug = cmdFiler.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:/debug/pprof/goroutine?debug=2") f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging") f.localSocket = cmdFiler.Flag.String("localSocket", "", "default to /tmp/seaweedfs-filer-.sock") @@ -310,25 +312,26 @@ func (fo *FilerOptions) startFiler() { filerAddress := pb.NewServerAddress(*fo.ip, *fo.port, *fo.portGrpc) fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{ - Masters: fo.masters, - FilerGroup: *fo.filerGroup, - Collection: *fo.collection, - DefaultReplication: *fo.defaultReplicaPlacement, - DisableDirListing: *fo.disableDirListing, - MaxMB: *fo.maxMB, - DirListingLimit: *fo.dirListingLimit, - DataCenter: *fo.dataCenter, - Rack: *fo.rack, - DefaultLevelDbDir: defaultLevelDbDirectory, - DisableHttp: *fo.disableHttp, - Host: filerAddress, - Cipher: *fo.cipher, - SaveToFilerLimit: int64(*fo.saveToFilerLimit), - ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024, - ShowUIDirectoryDelete: *fo.showUIDirectoryDelete, - DownloadMaxBytesPs: int64(*fo.downloadMaxMBps) * 1024 * 1024, - DiskType: *fo.diskType, - AllowedOrigins: strings.Split(*fo.allowedOrigins, ","), + Masters: fo.masters, + FilerGroup: *fo.filerGroup, + Collection: *fo.collection, + DefaultReplication: *fo.defaultReplicaPlacement, + DisableDirListing: *fo.disableDirListing, + MaxMB: *fo.maxMB, + DirListingLimit: *fo.dirListingLimit, + DataCenter: *fo.dataCenter, + Rack: *fo.rack, + DefaultLevelDbDir: defaultLevelDbDirectory, + DisableHttp: *fo.disableHttp, + Host: filerAddress, + Cipher: *fo.cipher, + SaveToFilerLimit: int64(*fo.saveToFilerLimit), + ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024, + ConcurrentFileUploadLimit: int64(*fo.concurrentFileUploadLimit), + ShowUIDirectoryDelete: *fo.showUIDirectoryDelete, + DownloadMaxBytesPs: int64(*fo.downloadMaxMBps) * 1024 * 1024, + DiskType: *fo.diskType, + AllowedOrigins: strings.Split(*fo.allowedOrigins, ","), }) if nfs_err != nil { glog.Fatalf("Filer startup error: %v", nfs_err) diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 3d08c0980..95d344af4 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -56,32 +56,34 @@ import ( ) type FilerOption struct { - Masters *pb.ServerDiscovery - FilerGroup string - Collection string - DefaultReplication string - DisableDirListing bool - MaxMB int - DirListingLimit int - DataCenter string - Rack string - DataNode string - DefaultLevelDbDir string - DisableHttp bool - Host pb.ServerAddress - recursiveDelete bool - Cipher bool - SaveToFilerLimit int64 - ConcurrentUploadLimit int64 - ShowUIDirectoryDelete bool - DownloadMaxBytesPs int64 - DiskType string - AllowedOrigins []string - ExposeDirectoryData bool + Masters *pb.ServerDiscovery + FilerGroup string + Collection string + DefaultReplication string + DisableDirListing bool + MaxMB int + DirListingLimit int + DataCenter string + Rack string + DataNode string + DefaultLevelDbDir string + DisableHttp bool + Host pb.ServerAddress + recursiveDelete bool + Cipher bool + SaveToFilerLimit int64 + ConcurrentUploadLimit int64 + ConcurrentFileUploadLimit int64 + ShowUIDirectoryDelete bool + DownloadMaxBytesPs int64 + DiskType string + AllowedOrigins []string + ExposeDirectoryData bool } type FilerServer struct { inFlightDataSize int64 + inFlightUploads int64 listenersWaits int64 // notifying clients diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index dcfc8e3ed..a2eab9365 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -95,14 +95,28 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { contentLength := getContentLength(r) fs.inFlightDataLimitCond.L.Lock() inFlightDataSize := atomic.LoadInt64(&fs.inFlightDataSize) - for fs.option.ConcurrentUploadLimit != 0 && inFlightDataSize > fs.option.ConcurrentUploadLimit { - glog.V(4).Infof("wait because inflight data %d > %d", inFlightDataSize, fs.option.ConcurrentUploadLimit) + inFlightUploads := atomic.LoadInt64(&fs.inFlightUploads) + + // Wait if either data size limit or file count limit is exceeded + for (fs.option.ConcurrentUploadLimit != 0 && inFlightDataSize > fs.option.ConcurrentUploadLimit) || (fs.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= fs.option.ConcurrentFileUploadLimit) { + if (fs.option.ConcurrentUploadLimit != 0 && inFlightDataSize > fs.option.ConcurrentUploadLimit) { + glog.V(4).Infof("wait because inflight data %d > %d", inFlightDataSize, fs.option.ConcurrentUploadLimit) + } + if (fs.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= fs.option.ConcurrentFileUploadLimit) { + glog.V(4).Infof("wait because inflight uploads %d >= %d", inFlightUploads, fs.option.ConcurrentFileUploadLimit) + } fs.inFlightDataLimitCond.Wait() inFlightDataSize = atomic.LoadInt64(&fs.inFlightDataSize) + inFlightUploads = atomic.LoadInt64(&fs.inFlightUploads) } fs.inFlightDataLimitCond.L.Unlock() + + // Increment counters + atomic.AddInt64(&fs.inFlightUploads, 1) atomic.AddInt64(&fs.inFlightDataSize, contentLength) defer func() { + // Decrement counters + atomic.AddInt64(&fs.inFlightUploads, -1) atomic.AddInt64(&fs.inFlightDataSize, -contentLength) fs.inFlightDataLimitCond.Signal() }()