Browse Source

Set volumes ReadOnly if low free disk space

pull/1351/head
Evgenii Kozlov 5 years ago
parent
commit
0e0db70f55
  1. 5
      weed/command/server.go
  2. 18
      weed/command/volume.go
  3. 5
      weed/server/volume_server.go
  4. 27
      weed/storage/disk_location.go
  5. 4
      weed/storage/store.go
  6. 8
      weed/storage/volume.go
  7. 10
      weed/topology/data_node.go
  8. 7
      weed/topology/topology.go

5
weed/command/server.go

@ -55,6 +55,8 @@ var (
serverDisableHttp = cmdServer.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.") serverDisableHttp = cmdServer.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.")
volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]... If set to zero on non-windows OS, the limit will be auto configured.") volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]... If set to zero on non-windows OS, the limit will be auto configured.")
volumeFreeDiskSpaceWatermark = cmdServer.Flag.String("volume.freeDiskSpaceWatermark", "0", "minimum free disk space(in percents). If free disk space lower this value - all volumes marks as ReadOnly")
// pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") // pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer")
isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway") isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway")
@ -206,7 +208,8 @@ func runServer(cmd *Command, args []string) bool {
// start volume server // start volume server
{ {
go serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption)
go serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption, *volumeFreeDiskSpaceWatermark)
} }
startMaster(masterOptions, serverWhiteList) startMaster(masterOptions, serverWhiteList)

18
weed/command/volume.go

@ -52,6 +52,7 @@ type VolumeServerOptions struct {
memProfile *string memProfile *string
compactionMBPerSecond *int compactionMBPerSecond *int
fileSizeLimitMB *int fileSizeLimitMB *int
freeDiskSpaceWatermark []float32
} }
func init() { func init() {
@ -87,6 +88,7 @@ var (
volumeFolders = cmdVolume.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") volumeFolders = cmdVolume.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]... If set to zero on non-windows OS, the limit will be auto configured.") maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]... If set to zero on non-windows OS, the limit will be auto configured.")
volumeWhiteListOption = cmdVolume.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") volumeWhiteListOption = cmdVolume.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
freeDiskSpaceWatermark = cmdVolume.Flag.String("freeDiskSpaceWatermark", "0", "minimum free disk space(in percents). If free disk space lower this value - all volumes marks as ReadOnly")
) )
func runVolume(cmd *Command, args []string) bool { func runVolume(cmd *Command, args []string) bool {
@ -96,12 +98,12 @@ func runVolume(cmd *Command, args []string) bool {
runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU())
grace.SetupProfiling(*v.cpuProfile, *v.memProfile) grace.SetupProfiling(*v.cpuProfile, *v.memProfile)
v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption)
v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption, *freeDiskSpaceWatermark)
return true return true
} }
func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, volumeWhiteListOption string) {
func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, volumeWhiteListOption, freeDiskSpaceWatermark string) {
// Set multiple folders and each folder's max volume count limit' // Set multiple folders and each folder's max volume count limit'
v.folders = strings.Split(volumeFolders, ",") v.folders = strings.Split(volumeFolders, ",")
@ -116,6 +118,16 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
if len(v.folders) != len(v.folderMaxLimits) { if len(v.folders) != len(v.folderMaxLimits) {
glog.Fatalf("%d directories by -dir, but only %d max is set by -max", len(v.folders), len(v.folderMaxLimits)) glog.Fatalf("%d directories by -dir, but only %d max is set by -max", len(v.folders), len(v.folderMaxLimits))
} }
freeDiskSpaceWatermarkStrings := strings.Split(freeDiskSpaceWatermark, ",")
for _, freeString := range freeDiskSpaceWatermarkStrings {
if value, e := strconv.ParseFloat(freeString, 32); e == nil {
v.freeDiskSpaceWatermark = append(v.freeDiskSpaceWatermark, float32(value))
} else {
glog.Fatalf("The value specified in -freeDiskSpaceWatermark not a valid value %s", freeString)
}
}
for _, folder := range v.folders { for _, folder := range v.folders {
if err := util.TestFolderWritable(folder); err != nil { if err := util.TestFolderWritable(folder); err != nil {
glog.Fatalf("Check Data Folder(-dir) Writable %s : %s", folder, err) glog.Fatalf("Check Data Folder(-dir) Writable %s : %s", folder, err)
@ -159,7 +171,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
*v.ip, *v.port, *v.publicUrl, *v.ip, *v.port, *v.publicUrl,
v.folders, v.folderMaxLimits,
v.folders, v.folderMaxLimits, v.freeDiskSpaceWatermark,
volumeNeedleMapKind, volumeNeedleMapKind,
strings.Split(masters, ","), 5, *v.dataCenter, *v.rack, strings.Split(masters, ","), 5, *v.dataCenter, *v.rack,
v.whiteList, v.whiteList,

5
weed/server/volume_server.go

@ -35,7 +35,7 @@ type VolumeServer struct {
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
port int, publicUrl string, port int, publicUrl string,
folders []string, maxCounts []int,
folders []string, maxCounts []int, freeDiskSpaceWatermark []float32,
needleMapKind storage.NeedleMapType, needleMapKind storage.NeedleMapType,
masterNodes []string, pulseSeconds int, masterNodes []string, pulseSeconds int,
dataCenter string, rack string, dataCenter string, rack string,
@ -68,8 +68,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
} }
vs.SeedMasterNodes = masterNodes vs.SeedMasterNodes = masterNodes
vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, vs.needleMapKind)
vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, freeDiskSpaceWatermark, vs.needleMapKind)
vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
handleStaticResources(adminMux) handleStaticResources(adminMux)

27
weed/storage/disk_location.go

@ -2,10 +2,13 @@ package storage
import ( import (
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/stats"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath"
"strings" "strings"
"sync" "sync"
"time"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
@ -15,6 +18,7 @@ import (
type DiskLocation struct { type DiskLocation struct {
Directory string Directory string
MaxVolumeCount int MaxVolumeCount int
FreeDiskSpaceWatermark float32
volumes map[needle.VolumeId]*Volume volumes map[needle.VolumeId]*Volume
volumesLock sync.RWMutex volumesLock sync.RWMutex
@ -23,10 +27,11 @@ type DiskLocation struct {
ecVolumesLock sync.RWMutex ecVolumesLock sync.RWMutex
} }
func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation {
location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount}
func NewDiskLocation(dir string, maxVolumeCount int, freeDiskSpaceWatermark float32) *DiskLocation {
location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount, FreeDiskSpaceWatermark: freeDiskSpaceWatermark}
location.volumes = make(map[needle.VolumeId]*Volume) location.volumes = make(map[needle.VolumeId]*Volume)
location.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume) location.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume)
go location.CheckDiskSpace()
return location return location
} }
@ -293,3 +298,21 @@ func (l *DiskLocation) UnUsedSpace(volumeSizeLimit uint64) (unUsedSpace uint64)
return return
} }
func (l *DiskLocation) CheckDiskSpace() {
lastStat := false
t := time.NewTicker(time.Minute)
for _ = range t.C {
if dir, e := filepath.Abs(l.Directory); e == nil {
s := stats.NewDiskStatus(dir)
if (s.PercentFree < l.FreeDiskSpaceWatermark) != lastStat {
lastStat = !lastStat
for _, v := range l.volumes {
v.SetLowDiskSpace(lastStat)
}
}
}
}
}

4
weed/storage/store.go

@ -48,11 +48,11 @@ func (s *Store) String() (str string) {
return return
} }
func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) {
func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, freeDiskSpaceWatermark []float32, needleMapKind NeedleMapType) (s *Store) {
s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind} s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind}
s.Locations = make([]*DiskLocation, 0) s.Locations = make([]*DiskLocation, 0)
for i := 0; i < len(dirnames); i++ { for i := 0; i < len(dirnames); i++ {
location := NewDiskLocation(dirnames[i], maxVolumeCounts[i])
location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], freeDiskSpaceWatermark[i])
location.loadExistingVolumes(needleMapKind) location.loadExistingVolumes(needleMapKind)
s.Locations = append(s.Locations, location) s.Locations = append(s.Locations, location)
stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i])) stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i]))

8
weed/storage/volume.go

@ -27,6 +27,7 @@ type Volume struct {
needleMapKind NeedleMapType needleMapKind NeedleMapType
noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
lowDiskSpace bool
hasRemoteFile bool // if the volume has a remote file hasRemoteFile bool // if the volume has a remote file
MemoryMapMaxSizeMb uint32 MemoryMapMaxSizeMb uint32
@ -45,6 +46,11 @@ type Volume struct {
volumeInfo *volume_server_pb.VolumeInfo volumeInfo *volume_server_pb.VolumeInfo
} }
func (v *Volume) SetLowDiskSpace(lowDiskSpace bool) {
glog.V(0).Infof("SetLowDiskSpace id %d value %t", v.Id, lowDiskSpace)
v.lowDiskSpace = lowDiskSpace
}
func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) { func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
// if replicaPlacement is nil, the superblock will be loaded from disk // if replicaPlacement is nil, the superblock will be loaded from disk
v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb, v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb,
@ -244,5 +250,5 @@ func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string) {
} }
func (v *Volume) IsReadOnly() bool { func (v *Volume) IsReadOnly() bool {
return v.noWriteOrDelete || v.noWriteCanDelete
return v.noWriteOrDelete || v.noWriteCanDelete || v.lowDiskSpace
} }

10
weed/topology/data_node.go

@ -41,7 +41,7 @@ func (dn *DataNode) String() string {
return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl) return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl)
} }
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) {
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
dn.Lock() dn.Lock()
defer dn.Unlock() defer dn.Unlock()
if oldV, ok := dn.volumes[v.Id]; !ok { if oldV, ok := dn.volumes[v.Id]; !ok {
@ -64,12 +64,13 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) {
dn.UpAdjustRemoteVolumeCountDelta(-1) dn.UpAdjustRemoteVolumeCountDelta(-1)
} }
} }
isChangedRO = dn.volumes[v.Id].ReadOnly != v.ReadOnly
dn.volumes[v.Id] = v dn.volumes[v.Id] = v
} }
return return
} }
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes []storage.VolumeInfo) {
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) {
actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo) actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo)
for _, v := range actualVolumes { for _, v := range actualVolumes {
actualVolumeMap[v.Id] = v actualVolumeMap[v.Id] = v
@ -91,10 +92,13 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume
} }
dn.Unlock() dn.Unlock()
for _, v := range actualVolumes { for _, v := range actualVolumes {
isNew := dn.AddOrUpdateVolume(v)
isNew, isChangedRO := dn.AddOrUpdateVolume(v)
if isNew { if isNew {
newVolumes = append(newVolumes, v) newVolumes = append(newVolumes, v)
} }
if isChangedRO {
changeRO = append(changeRO, v)
}
} }
return return
} }

7
weed/topology/topology.go

@ -212,13 +212,18 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati
} }
} }
// find out the delta volumes // find out the delta volumes
newVolumes, deletedVolumes = dn.UpdateVolumes(volumeInfos)
var changedVolumes []storage.VolumeInfo
newVolumes, deletedVolumes, changedVolumes = dn.UpdateVolumes(volumeInfos)
for _, v := range newVolumes { for _, v := range newVolumes {
t.RegisterVolumeLayout(v, dn) t.RegisterVolumeLayout(v, dn)
} }
for _, v := range deletedVolumes { for _, v := range deletedVolumes {
t.UnRegisterVolumeLayout(v, dn) t.UnRegisterVolumeLayout(v, dn)
} }
for _, v := range changedVolumes {
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
vl.ensureCorrectWritables(&v)
}
return return
} }

Loading…
Cancel
Save