Browse Source

refactoring

pull/1960/head
Chris Lu 4 years ago
parent
commit
7e8edc3c4a
  1. 4
      weed/filesys/wfs.go
  2. 61
      weed/operation/upload_processor.go
  3. 8
      weed/server/filer_server_handlers_write_upload.go
  4. 78
      weed/util/limiter.go

4
weed/filesys/wfs.go

@ -77,7 +77,7 @@ type WFS struct {
signature int32 signature int32
// throttle writers // throttle writers
concurrentWriters *util.LimitedConcurrentExecutor
concurrentWriters *util.LimitedOutOfOrderProcessor
Server *fs.Server Server *fs.Server
} }
type statsCache struct { type statsCache struct {
@ -135,7 +135,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
wfs.fsNodeCache = newFsCache(wfs.root) wfs.fsNodeCache = newFsCache(wfs.root)
if wfs.option.ConcurrentWriters > 0 { if wfs.option.ConcurrentWriters > 0 {
wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
wfs.concurrentWriters = util.NewLimitedOutOfOrderProcessor(int32(wfs.option.ConcurrentWriters))
} }
return wfs return wfs

61
weed/operation/upload_processor.go

@ -1,61 +0,0 @@
package operation
import (
"reflect"
"runtime"
"sync"
"sync/atomic"
)
type OperationRequest func()
var (
requestSlots = uint32(32)
requests = make([]chan OperationRequest, requestSlots) // increase slots to increase fairness
ConcurrentUploadLimit = int32(runtime.NumCPU()) // directly related to memory usage
concurrentLimitCond = sync.NewCond(new(sync.Mutex))
concurrentUpload int32
)
func init() {
for i := 0; i < int(requestSlots); i++ {
requests[i] = make(chan OperationRequest)
}
cases := make([]reflect.SelectCase, requestSlots)
for i, ch := range requests {
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
go func() {
for {
_, value, ok := reflect.Select(cases)
if !ok {
continue
}
request := value.Interface().(OperationRequest)
concurrentLimitCond.L.Lock()
for atomic.LoadInt32(&concurrentUpload) > ConcurrentUploadLimit {
concurrentLimitCond.Wait()
}
atomic.AddInt32(&concurrentUpload, 1)
concurrentLimitCond.L.Unlock()
go func() {
defer atomic.AddInt32(&concurrentUpload, -1)
defer concurrentLimitCond.Signal()
request()
}()
}
}()
}
func AsyncOutOfOrderProcess(slotKey uint32, request OperationRequest) {
index := slotKey % requestSlots
requests[index] <- request
}

8
weed/server/filer_server_handlers_write_upload.go

@ -5,8 +5,8 @@ import (
"hash" "hash"
"io" "io"
"io/ioutil" "io/ioutil"
"math/rand"
"net/http" "net/http"
"runtime"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -20,6 +20,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
var (
limitedUploadProcessor = util.NewLimitedOutOfOrderProcessor(int32(runtime.NumCPU()))
)
func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, dataSize int64, err error, smallContent []byte) { func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, dataSize int64, err error, smallContent []byte) {
md5Hash = md5.New() md5Hash = md5.New()
@ -58,7 +62,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
for readErr == nil { for readErr == nil {
wg.Add(1) wg.Add(1)
operation.AsyncOutOfOrderProcess(rand.Uint32(), func() {
limitedUploadProcessor.Execute(func() {
defer wg.Done() defer wg.Done()
var localOffset int64 var localOffset int64

78
weed/util/limiter.go

@ -1,40 +1,70 @@
package util package util
// initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go
import (
"math/rand"
"reflect"
"sync"
"sync/atomic"
)
// LimitedConcurrentExecutor object
type LimitedConcurrentExecutor struct {
limit int
tokenChan chan int
type OperationRequest func()
type LimitedOutOfOrderProcessor struct {
processorSlots uint32
processors []chan OperationRequest
processorLimit int32
processorLimitCond *sync.Cond
currentProcessor int32
} }
func NewLimitedConcurrentExecutor(limit int) *LimitedConcurrentExecutor {
func NewLimitedOutOfOrderProcessor(limit int32) (c *LimitedOutOfOrderProcessor) {
// allocate a limiter instance
c := &LimitedConcurrentExecutor{
limit: limit,
tokenChan: make(chan int, limit),
processorSlots := uint32(32)
c = &LimitedOutOfOrderProcessor{
processorSlots: processorSlots,
processors: make([]chan OperationRequest, processorSlots),
processorLimit: limit,
processorLimitCond: sync.NewCond(new(sync.Mutex)),
} }
// allocate the tokenChan:
for i := 0; i < c.limit; i++ {
c.tokenChan <- i
for i := 0; i < int(processorSlots); i++ {
c.processors[i] = make(chan OperationRequest)
} }
return c
cases := make([]reflect.SelectCase, processorSlots)
for i, ch := range c.processors {
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
go func() {
for {
_, value, ok := reflect.Select(cases)
if !ok {
continue
}
request := value.Interface().(OperationRequest)
c.processorLimitCond.L.Lock()
for atomic.LoadInt32(&c.currentProcessor) > c.processorLimit {
c.processorLimitCond.Wait()
} }
atomic.AddInt32(&c.currentProcessor, 1)
c.processorLimitCond.L.Unlock()
// Execute adds a function to the execution queue.
// if num of go routines allocated by this instance is < limit
// launch a new go routine to execute job
// else wait until a go routine becomes available
func (c *LimitedConcurrentExecutor) Execute(job func()) {
token := <-c.tokenChan
go func() { go func() {
defer func() {
c.tokenChan <- token
defer atomic.AddInt32(&c.currentProcessor, -1)
defer c.processorLimitCond.Signal()
request()
}() }()
// run the job
job()
}
}() }()
return c
}
func (c *LimitedOutOfOrderProcessor) Execute(request OperationRequest) {
index := rand.Uint32() % c.processorSlots
c.processors[index] <- request
} }
Loading…
Cancel
Save