Browse Source

add stat of memory, disk, load, process to volume info of heartbeats for masters more details to volumes

pull/843/head
bingoohuang 7 years ago
parent
commit
b8abcb3b5e
  1. 986
      weed/pb/filer_pb/filer.pb.go
  2. 16
      weed/pb/master.proto
  3. 829
      weed/pb/master_pb/master.pb.go
  4. 1130
      weed/pb/volume_server_pb/volume_server.pb.go
  5. 4
      weed/server/master_grpc_server.go
  6. 25
      weed/storage/store.go
  7. 108
      weed/storage/sysstat.go
  8. 50
      weed/storage/sysstat_test.go
  9. 29
      weed/storage/volume_info.go
  10. 3
      weed/storage/volume_super_block.go
  11. 2
      weed/topology/store_replicate.go
  12. 6
      weed/topology/topology.go
  13. 2
      weed/topology/topology_event_handling.go

986
weed/pb/filer_pb/filer.pb.go
File diff suppressed because it is too large
View File

16
weed/pb/master.proto

@ -51,6 +51,22 @@ message VolumeInformationMessage {
uint32 replica_placement = 8;
uint32 version = 9;
uint32 ttl = 10;
uint64 memory_total = 11;
uint64 memory_free = 12;
uint64 disk_watermark = 13;
uint64 disk_total = 14;
uint64 disk_free = 15;
string disk_device = 16;
string disk_mount_point = 17;
double load1 = 18;
double load5 = 19;
double load15 = 20;
double process_cpu_usage = 21;
uint64 process_rss = 22;
}
message Empty {

829
weed/pb/master_pb/master.pb.go
File diff suppressed because it is too large
View File

1130
weed/pb/volume_server_pb/volume_server.pb.go
File diff suppressed because it is too large
View File

4
weed/server/master_grpc_server.go

@ -61,9 +61,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
dc := t.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
dn = rack.GetOrCreateDataNode(heartbeat.Ip,
int(heartbeat.Port), heartbeat.PublicUrl,
int(heartbeat.MaxVolumeCount))
dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), heartbeat.PublicUrl, int(heartbeat.MaxVolumeCount))
glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{
VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,

25
weed/storage/store.go

@ -27,6 +27,8 @@ type Store struct {
NeedleMapType NeedleMapType
NewVolumeIdChan chan VolumeId
DeletedVolumeIdChan chan VolumeId
DiskWatermark uint64
}
func (s *Store) String() (str string) {
@ -143,14 +145,23 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
var volumeMessages []*master_pb.VolumeInformationMessage
maxVolumeCount := 0
var maxFileKey NeedleId
memTotal, memFree, _ := MemoryStat()
load1, load5, load15, _ := LoadStat()
_, processCpuUsage, proessRss, _ := ProcessStat()
for _, location := range s.Locations {
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
location.Lock()
diskTotal, diskFree, diskDevice, diskMountPoint, _ := DiskStat(location.Directory)
for k, v := range location.volumes {
if maxFileKey < v.nm.MaxFileKey() {
maxFileKey = v.nm.MaxFileKey()
}
if !v.expired(s.VolumeSizeLimit) {
volumeMessage := &master_pb.VolumeInformationMessage{
Id: uint32(k),
Size: uint64(v.Size()),
@ -162,6 +173,20 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
Version: uint32(v.Version()),
Ttl: v.Ttl.ToUint32(),
MemoryTotal: memTotal,
MemoryFree: memFree,
DiskWatermark: s.DiskWatermark,
DiskTotal: diskTotal,
DiskFree: diskFree,
DiskDevice: diskDevice,
DiskMountPoint: diskMountPoint,
Load1: load1,
Load5: load5,
Load15: load15,
ProcessCpuUsage: processCpuUsage,
ProcessRss: proessRss,
}
volumeMessages = append(volumeMessages, volumeMessage)
} else {

108
weed/storage/sysstat.go

@ -0,0 +1,108 @@
package storage
import (
"fmt"
"github.com/shirou/gopsutil/disk"
"github.com/shirou/gopsutil/load"
"github.com/shirou/gopsutil/mem"
"github.com/shirou/gopsutil/process"
"os"
"path/filepath"
"syscall"
)
// MemoryStat stats the current total and free memory of the host
func MemoryStat() (total, free uint64, err error) {
stat, err := mem.VirtualMemory()
if err != nil {
return 0, 0, err
}
return stat.Total, stat.Free, nil
}
// DiskStat stats the current total and free of specified dir
func DiskStat(dir string) (total, free uint64, device, mountPoint string, err error) {
absPath, _ := filepath.Abs(dir)
stat, err := disk.Usage(absPath)
if err != nil {
return 0, 0, "", "", err
}
point, err := MountPoint(absPath)
if err != nil {
return 0, 0, "", "", err
}
partitions, _ := disk.Partitions(false)
for _, p := range partitions {
if p.Mountpoint == point {
device = p.Device
break
}
fmt.Println(p.Device, p.Fstype, p.Mountpoint)
}
return stat.Total, stat.Free, device, point, nil
}
// LoadStat return average load1, load5 and load15 of the host
func LoadStat() (load1, load5, load15 float64, err error) {
stat, e := load.Avg()
if e != nil {
return 0, 0, 0, e
}
return stat.Load1, stat.Load5, stat.Load15, nil
}
// ProcessStat return cpu usage and RSS of the current process
func ProcessStat() (name string, cpuUsage float64, rss uint64, err error) {
p, e := process.NewProcess(int32(os.Getpid()))
if e != nil {
return "", 0, 0, e
}
stat, e := p.MemoryInfo()
if e != nil {
return "", 0, 0, e
}
cpuUsage, e = p.CPUPercent()
if e != nil {
return "", 0, 0, e
}
name, e = p.Name()
if e != nil {
return "", 0, 0, e
}
return name, cpuUsage, stat.RSS, nil
}
func MountPoint(absPath string) (string, error) {
pi, err := os.Stat(absPath)
if err != nil {
return "", err
}
odev := pi.Sys().(*syscall.Stat_t).Dev
for absPath != "/" {
_path := filepath.Dir(absPath)
in, err := os.Stat(_path)
if err != nil {
return "", err
}
if odev != in.Sys().(*syscall.Stat_t).Dev {
break
}
absPath = _path
}
return absPath, nil
}

50
weed/storage/sysstat_test.go

@ -0,0 +1,50 @@
package storage
import (
"fmt"
"github.com/stretchr/testify/assert"
"testing"
)
func TestMemoryStat(t *testing.T) {
total, free, err := MemoryStat()
assert.Nil(t, err)
if total <= 0 {
println("total", total, "free", free)
t.Fail()
}
}
func TestDiskStat(t *testing.T) {
total, free, device, mountPoint, err := DiskStat("..")
assert.Nil(t, err)
if total <= 0 {
println("total", total, "free", free)
t.Fail()
}
fmt.Println("device", device, "mountPoint", mountPoint)
}
func TestAvgLoad(t *testing.T) {
load1, load5, load15, err := LoadStat()
assert.Nil(t, err)
if load1 <= 0 {
println("load1", load1, "load5", load5, "load15", load15)
t.Fail()
}
}
func TestProcessStat(t *testing.T) {
name, cpuUsage, rss, err := ProcessStat()
assert.Nil(t, err)
if rss <= 0 {
println("name", name, "cpuUsage", cpuUsage, "RSS", rss)
t.Fail()
}
}

29
weed/storage/volume_info.go

@ -18,6 +18,21 @@ type VolumeInfo struct {
DeleteCount int
DeletedByteCount uint64
ReadOnly bool
MemoryTotal uint64
MemoryFree uint64
DiskWatermark uint64
DiskTotal uint64
DiskFree uint64
DiskDevice string
DiskMountPoint string
Load1 float64
Load5 float64
Load15 float64
ProcessCpuUsage float64
ProcessRss uint64
}
func NewVolumeInfo(m *master_pb.VolumeInformationMessage) (vi VolumeInfo, err error) {
@ -30,6 +45,20 @@ func NewVolumeInfo(m *master_pb.VolumeInformationMessage) (vi VolumeInfo, err er
DeletedByteCount: m.DeletedByteCount,
ReadOnly: m.ReadOnly,
Version: Version(m.Version),
MemoryTotal: m.MemoryTotal,
MemoryFree: m.MemoryFree,
DiskWatermark: m.DiskWatermark,
DiskTotal: m.DiskTotal,
DiskFree: m.DiskFree,
DiskDevice: m.DiskDevice,
DiskMountPoint: m.DiskMountPoint,
Load1: m.Load1,
Load5: m.Load5,
Load15: m.Load15,
ProcessCpuUsage: m.ProcessCpuUsage,
ProcessRss: m.ProcessRss,
}
rp, e := NewReplicaPlacementFromByte(byte(m.ReplicaPlacement))
if e != nil {

3
weed/storage/volume_super_block.go

@ -2,12 +2,11 @@ package storage
import (
"fmt"
"os"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
"os"
)
const (

2
weed/topology/store_replicate.go

@ -135,7 +135,7 @@ type RemoteResult struct {
func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) error) error {
if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil {
length := 0
selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port))
selfUrl := store.Ip + ":" + strconv.Itoa(store.Port)
results := make(chan RemoteResult)
for _, location := range lookupResult.Locations {
if location.Url != selfUrl {

6
weed/topology/topology.go

@ -101,12 +101,12 @@ func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool {
}
func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) {
vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option)
if err != nil || datanodes.Length() == 0 {
vid, count, dataNodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option)
if err != nil || dataNodes.Length() == 0 {
return "", 0, nil, errors.New("No writable volumes available!")
}
fileId, count := t.Sequence.NextFileId(count)
return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, dataNodes.Head(), nil
}
func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {

2
weed/topology/topology_event_handling.go

@ -20,7 +20,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold float64, preallo
}()
go func(garbageThreshold float64) {
c := time.Tick(15 * time.Minute)
for _ = range c {
for range c {
if t.IsLeader() {
t.Vacuum(garbageThreshold, preallocate)
}

Loading…
Cancel
Save