Browse Source

filer: change to saveToFilerLimit from cacheToFilerLimit

short circuit saving small files to volume server
pull/1743/head
Chris Lu 4 years ago
parent
commit
1efb51ba84
  1. 28
      weed/command/filer.go
  2. 2
      weed/command/server.go
  3. 24
      weed/server/filer_server.go
  4. 14
      weed/server/filer_server_handlers_write_autochunk.go

28
weed/command/filer.go

@ -42,7 +42,7 @@ type FilerOptions struct {
cipher *bool cipher *bool
peers *string peers *string
metricsHttpPort *int metricsHttpPort *int
cacheToFilerLimit *int
saveToFilerLimit *int
defaultLevelDbDirectory *string defaultLevelDbDirectory *string
} }
@ -64,7 +64,7 @@ func init() {
f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers") f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers")
f.peers = cmdFiler.Flag.String("peers", "", "all filers sharing the same filer store in comma separated ip:port list") f.peers = cmdFiler.Flag.String("peers", "", "all filers sharing the same filer store in comma separated ip:port list")
f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
f.cacheToFilerLimit = cmdFiler.Flag.Int("cacheToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.")
f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store")
f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory") f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory")
// start s3 on filer // start s3 on filer
@ -139,18 +139,18 @@ func (fo *FilerOptions) startFiler() {
Masters: strings.Split(*fo.masters, ","), Masters: strings.Split(*fo.masters, ","),
Collection: *fo.collection, Collection: *fo.collection,
DefaultReplication: *fo.defaultReplicaPlacement, DefaultReplication: *fo.defaultReplicaPlacement,
DisableDirListing: *fo.disableDirListing,
MaxMB: *fo.maxMB,
DirListingLimit: *fo.dirListingLimit,
DataCenter: *fo.dataCenter,
Rack: *fo.rack,
DefaultLevelDbDir: defaultLevelDbDirectory,
DisableHttp: *fo.disableHttp,
Host: *fo.ip,
Port: uint32(*fo.port),
Cipher: *fo.cipher,
CacheToFilerLimit: int64(*fo.cacheToFilerLimit),
Filers: peers,
DisableDirListing: *fo.disableDirListing,
MaxMB: *fo.maxMB,
DirListingLimit: *fo.dirListingLimit,
DataCenter: *fo.dataCenter,
Rack: *fo.rack,
DefaultLevelDbDir: defaultLevelDbDirectory,
DisableHttp: *fo.disableHttp,
Host: *fo.ip,
Port: uint32(*fo.port),
Cipher: *fo.cipher,
SaveToFilerLimit: *fo.saveToFilerLimit,
Filers: peers,
}) })
if nfs_err != nil { if nfs_err != nil {
glog.Fatalf("Filer startup error: %v", nfs_err) glog.Fatalf("Filer startup error: %v", nfs_err)

2
weed/command/server.go

@ -95,7 +95,7 @@ func init() {
filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size") filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size")
filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers") filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers")
filerOptions.peers = cmdServer.Flag.String("filer.peers", "", "all filers sharing the same filer store in comma separated ip:port list") filerOptions.peers = cmdServer.Flag.String("filer.peers", "", "all filers sharing the same filer store in comma separated ip:port list")
filerOptions.cacheToFilerLimit = cmdServer.Flag.Int("filer.cacheToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.")
filerOptions.saveToFilerLimit = cmdServer.Flag.Int("filer.saveToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.")
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port") serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port") serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")

24
weed/server/filer_server.go

@ -46,18 +46,18 @@ type FilerOption struct {
Collection string Collection string
DefaultReplication string DefaultReplication string
DisableDirListing bool DisableDirListing bool
MaxMB int
DirListingLimit int
DataCenter string
Rack string
DefaultLevelDbDir string
DisableHttp bool
Host string
Port uint32
recursiveDelete bool
Cipher bool
CacheToFilerLimit int64
Filers []string
MaxMB int
DirListingLimit int
DataCenter string
Rack string
DefaultLevelDbDir string
DisableHttp bool
Host string
Port uint32
recursiveDelete bool
Cipher bool
SaveToFilerLimit int
Filers []string
} }
type FilerServer struct { type FilerServer struct {

14
weed/server/filer_server_handlers_write_autochunk.go

@ -207,7 +207,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash)) var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
chunkOffset := int64(0) chunkOffset := int64(0)
var smallContent, content []byte
var smallContent []byte
for { for {
limitedReader := io.LimitReader(partReader, int64(chunkSize)) limitedReader := io.LimitReader(partReader, int64(chunkSize))
@ -216,6 +216,13 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
if err != nil { if err != nil {
return nil, nil, 0, err, nil return nil, nil, 0, err, nil
} }
if chunkOffset == 0 {
if len(data) < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 {
smallContent = data
chunkOffset += int64(len(data))
break
}
}
dataReader := util.NewBytesReader(data) dataReader := util.NewBytesReader(data)
// retry to assign a different file id // retry to assign a different file id
@ -242,8 +249,6 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
return nil, nil, 0, uploadErr, nil return nil, nil, 0, uploadErr, nil
} }
content = data
// if last chunk exhausted the reader exactly at the border // if last chunk exhausted the reader exactly at the border
if uploadResult.Size == 0 { if uploadResult.Size == 0 {
break break
@ -263,9 +268,6 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
} }
} }
if chunkOffset < fs.option.CacheToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && chunkOffset < 4*1024 {
smallContent = content
}
return fileChunks, md5Hash, chunkOffset, nil, smallContent return fileChunks, md5Hash, chunkOffset, nil, smallContent
} }

Loading…
Cancel
Save