@ -1,10 +1,12 @@
package weed_server
package weed_server
import (
import (
"fmt"
"net/http"
"net/http"
"strconv"
"strconv"
"strings"
"strings"
"sync/atomic"
"sync/atomic"
"time"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util"
@ -55,22 +57,32 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
vs . guard . WhiteList ( vs . DeleteHandler ) ( w , r )
vs . guard . WhiteList ( vs . DeleteHandler ) ( w , r )
case "PUT" , "POST" :
case "PUT" , "POST" :
// wait until in flight data is less than the limit
contentLength := getContentLength ( r )
contentLength := getContentLength ( r )
// exclude the replication from the concurrentUploadLimitMB
if r . URL . Query ( ) . Get ( "type" ) != "replicate" { //Non-Replication
startTime := time . Now ( )
vs . inFlightUploadDataLimitCond . L . Lock ( )
vs . inFlightUploadDataLimitCond . L . Lock ( )
for vs . concurrentUploadLimit != 0 && atomic . LoadInt64 ( & vs . inFlightUploadDataSize ) > vs . concurrentUploadLimit {
// exclude the replication from the concurrentUploadLimitMB
if r . URL . Query ( ) . Get ( "type" ) != "replicate" {
for vs . concurrentUploadLimit != 0 && vs . inFlightUploadDataSize > vs . concurrentUploadLimit {
//wait timeout
if startTime . Add ( vs . inflightUploadDataTimeout ) . Before ( time . Now ( ) ) {
err := fmt . Errorf ( "reject because inflight upload data %d > %d, and wait timeout" , vs . inFlightUploadDataSize , vs . concurrentUploadLimit )
vs . inFlightUploadDataLimitCond . L . Unlock ( )
glog . V ( 1 ) . Infof ( "too many requests: %v" , err )
writeJsonError ( w , r , http . StatusTooManyRequests , err )
return
}
glog . V ( 4 ) . Infof ( "wait because inflight upload data %d > %d" , vs . inFlightUploadDataSize , vs . concurrentUploadLimit )
glog . V ( 4 ) . Infof ( "wait because inflight upload data %d > %d" , vs . inFlightUploadDataSize , vs . concurrentUploadLimit )
vs . inFlightUploadDataLimitCond . Wait ( )
vs . inFlightUploadDataLimitCond . Wait ( )
}
}
vs . inFlightUploadDataLimitCond . L . Unlock ( )
}
}
vs . inFlightUploadDataSize += contentLength
vs . inFlightUploadDataLimitCond . L . Unlock ( )
atomic . AddInt64 ( & vs . inFlightUploadDataSize , contentLength )
defer func ( ) {
defer func ( ) {
atomic . AddInt64 ( & vs . inFlightUploadDataSize , - contentLength )
vs . inFlightUploadDataLimitCond . L . Lock ( )
vs . inFlightUploadDataSize -= contentLength
vs . inFlightUploadDataLimitCond . L . Unlock ( )
vs . inFlightUploadDataLimitCond . Signal ( )
vs . inFlightUploadDataLimitCond . Signal ( )
} ( )
} ( )