diff --git a/weed/command/master.go b/weed/command/master.go index acfa9484a..3b1986536 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -231,6 +231,8 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { } go grpcS.Serve(grpcL) + rootCtx, rootCancel := context.WithCancel(context.Background()) + timeSleep := 1500 * time.Millisecond if !*masterOption.raftHashicorp { go func() { @@ -245,8 +247,8 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { }() } - go ms.MasterClient.KeepConnectedToMaster(context.Background()) - go weed_server.CollectCollectionsStats(context.Background(), ms, *masterOption.intervalToCollectStats) + go ms.MasterClient.KeepConnectedToMaster(rootCtx) + statsShutdown := weed_server.CollectCollectionsStats(rootCtx, ms, *masterOption.intervalToCollectStats) // start http server var ( @@ -286,6 +288,15 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { grace.OnInterrupt(ms.Shutdown) grace.OnInterrupt(grpcS.Stop) + grace.OnInterrupt(func() { + rootCancel() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := statsShutdown(ctx); err != nil { + glog.Errorf("graceful shutdown of collections stats: %v", err) + } + }) grace.OnReload(func() { if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader { ms.Topo.HashicorpRaft.LeadershipTransfer() diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 9a095a303..ecead85e6 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -35,6 +35,7 @@ import ( util_http "github.com/seaweedfs/seaweedfs/weed/util/http" "github.com/seaweedfs/seaweedfs/weed/util/version" "github.com/seaweedfs/seaweedfs/weed/wdclient" + "math" ) const ( @@ -436,49 +437,70 @@ func (ms *MasterServer) Reload() { ) } -func CollectCollectionsStats(ctx context.Context, ms *MasterServer, intervalToCollect time.Duration) { - ticker := time.NewTicker(intervalToCollect) - defer ticker.Stop() +func CollectCollectionsStats(parent context.Context, ms *MasterServer, interval time.Duration) (shutdown func(ctx context.Context) error) { + ctx, cancel := context.WithCancel(parent) + done := make(chan struct{}) - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - volumeList, err := ms.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) - if err != nil { - glog.Errorf("collect volume list: %v", err) - } + go func() { + defer close(done) + ticker := time.NewTicker(interval) + defer ticker.Stop() - collectionInfos := make(map[string]*shell.CollectionInfo) - for _, dc := range volumeList.TopologyInfo.DataCenterInfos { - for _, r := range dc.RackInfos { - for _, dn := range r.DataNodeInfos { - for _, diskInfo := range dn.DiskInfos { - for _, vif := range diskInfo.VolumeInfos { - c := vif.Collection - cif, found := collectionInfos[c] - if !found { - cif = &shell.CollectionInfo{} - collectionInfos[c] = cif + req := &master_pb.VolumeListRequest{} + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + volumeList, err := ms.VolumeList(ctx, req) + if err != nil { + glog.Errorf("collect volume list: %v", err) + continue + } + if volumeList == nil || volumeList.TopologyInfo == nil { + continue + } + + collectionInfos := make(map[string]*shell.CollectionInfo) + for _, dc := range volumeList.TopologyInfo.DataCenterInfos { + for _, r := range dc.RackInfos { + for _, dn := range r.DataNodeInfos { + for _, diskInfo := range dn.DiskInfos { + for _, vif := range diskInfo.VolumeInfos { + c := vif.Collection + cif := collectionInfos[c] + if cif == nil { + cif = &shell.CollectionInfo{} + collectionInfos[c] = cif + } + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vif.ReplicaPlacement)) + copyCount := float64(replicaPlacement.GetCopyCount()) + cif.Size += float64(vif.Size) / copyCount + cif.DeleteCount += float64(vif.DeleteCount) / copyCount + cif.FileCount += float64(vif.FileCount) / copyCount + cif.DeletedByteCount += float64(vif.DeletedByteCount) / copyCount + cif.VolumeCount++ } - replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vif.ReplicaPlacement)) - copyCount := float64(replicaPlacement.GetCopyCount()) - cif.Size += float64(vif.Size) / copyCount - cif.DeleteCount += float64(vif.DeleteCount) / copyCount - cif.FileCount += float64(vif.FileCount) / copyCount - cif.DeletedByteCount += float64(vif.DeletedByteCount) / copyCount - cif.VolumeCount++ } } } } - } - for k, v := range collectionInfos { - stats.S3BucketFileCount.WithLabelValues(k).Set(v.FileCount - v.DeleteCount) - stats.S3BucketSize.WithLabelValues(k).Set(v.Size - v.DeletedByteCount) + for bucket, v := range collectionInfos { + stats.S3BucketFileCount.WithLabelValues(bucket).Set(math.Round(v.FileCount - v.DeleteCount)) + stats.S3BucketSize.WithLabelValues(bucket).Set(math.Round(v.Size - v.DeletedByteCount)) + } } } + }() + + return func(ctx context.Context) error { + cancel() + select { + case <-done: + return nil + case <-ctx.Done(): + return ctx.Err() + } } }