Browse Source

[stats]: rewrite s3 stats collector to return shutdown func

pull/7236/head
ptukha 1 month ago
parent
commit
7cfe6e20a5
  1. 15
      weed/command/master.go
  2. 90
      weed/server/master_server.go

15
weed/command/master.go

@ -231,6 +231,8 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
}
go grpcS.Serve(grpcL)
rootCtx, rootCancel := context.WithCancel(context.Background())
timeSleep := 1500 * time.Millisecond
if !*masterOption.raftHashicorp {
go func() {
@ -245,8 +247,8 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
}()
}
go ms.MasterClient.KeepConnectedToMaster(context.Background())
go weed_server.CollectCollectionsStats(context.Background(), ms, *masterOption.intervalToCollectStats)
go ms.MasterClient.KeepConnectedToMaster(rootCtx)
statsShutdown := weed_server.CollectCollectionsStats(rootCtx, ms, *masterOption.intervalToCollectStats)
// start http server
var (
@ -286,6 +288,15 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
grace.OnInterrupt(ms.Shutdown)
grace.OnInterrupt(grpcS.Stop)
grace.OnInterrupt(func() {
rootCancel()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := statsShutdown(ctx); err != nil {
glog.Errorf("graceful shutdown of collections stats: %v", err)
}
})
grace.OnReload(func() {
if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader {
ms.Topo.HashicorpRaft.LeadershipTransfer()

90
weed/server/master_server.go

@ -35,6 +35,7 @@ import (
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"github.com/seaweedfs/seaweedfs/weed/util/version"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"math"
)
const (
@ -436,49 +437,70 @@ func (ms *MasterServer) Reload() {
)
}
func CollectCollectionsStats(ctx context.Context, ms *MasterServer, intervalToCollect time.Duration) {
ticker := time.NewTicker(intervalToCollect)
defer ticker.Stop()
func CollectCollectionsStats(parent context.Context, ms *MasterServer, interval time.Duration) (shutdown func(ctx context.Context) error) {
ctx, cancel := context.WithCancel(parent)
done := make(chan struct{})
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
volumeList, err := ms.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
if err != nil {
glog.Errorf("collect volume list: %v", err)
}
go func() {
defer close(done)
ticker := time.NewTicker(interval)
defer ticker.Stop()
collectionInfos := make(map[string]*shell.CollectionInfo)
for _, dc := range volumeList.TopologyInfo.DataCenterInfos {
for _, r := range dc.RackInfos {
for _, dn := range r.DataNodeInfos {
for _, diskInfo := range dn.DiskInfos {
for _, vif := range diskInfo.VolumeInfos {
c := vif.Collection
cif, found := collectionInfos[c]
if !found {
cif = &shell.CollectionInfo{}
collectionInfos[c] = cif
req := &master_pb.VolumeListRequest{}
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
volumeList, err := ms.VolumeList(ctx, req)
if err != nil {
glog.Errorf("collect volume list: %v", err)
continue
}
if volumeList == nil || volumeList.TopologyInfo == nil {
continue
}
collectionInfos := make(map[string]*shell.CollectionInfo)
for _, dc := range volumeList.TopologyInfo.DataCenterInfos {
for _, r := range dc.RackInfos {
for _, dn := range r.DataNodeInfos {
for _, diskInfo := range dn.DiskInfos {
for _, vif := range diskInfo.VolumeInfos {
c := vif.Collection
cif := collectionInfos[c]
if cif == nil {
cif = &shell.CollectionInfo{}
collectionInfos[c] = cif
}
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vif.ReplicaPlacement))
copyCount := float64(replicaPlacement.GetCopyCount())
cif.Size += float64(vif.Size) / copyCount
cif.DeleteCount += float64(vif.DeleteCount) / copyCount
cif.FileCount += float64(vif.FileCount) / copyCount
cif.DeletedByteCount += float64(vif.DeletedByteCount) / copyCount
cif.VolumeCount++
}
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vif.ReplicaPlacement))
copyCount := float64(replicaPlacement.GetCopyCount())
cif.Size += float64(vif.Size) / copyCount
cif.DeleteCount += float64(vif.DeleteCount) / copyCount
cif.FileCount += float64(vif.FileCount) / copyCount
cif.DeletedByteCount += float64(vif.DeletedByteCount) / copyCount
cif.VolumeCount++
}
}
}
}
}
for k, v := range collectionInfos {
stats.S3BucketFileCount.WithLabelValues(k).Set(v.FileCount - v.DeleteCount)
stats.S3BucketSize.WithLabelValues(k).Set(v.Size - v.DeletedByteCount)
for bucket, v := range collectionInfos {
stats.S3BucketFileCount.WithLabelValues(bucket).Set(math.Round(v.FileCount - v.DeleteCount))
stats.S3BucketSize.WithLabelValues(bucket).Set(math.Round(v.Size - v.DeletedByteCount))
}
}
}
}()
return func(ctx context.Context) error {
cancel()
select {
case <-done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
}
Loading…
Cancel
Save