diff --git a/weed/command/master.go b/weed/command/master.go index 8e10d25a2..3b1986536 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -49,21 +49,22 @@ type MasterOptions struct { volumePreallocate *bool maxParallelVacuumPerServer *int // pulseSeconds *int - defaultReplication *string - garbageThreshold *float64 - whiteList *string - disableHttp *bool - metricsAddress *string - metricsIntervalSec *int - raftResumeState *bool - metricsHttpPort *int - metricsHttpIp *string - heartbeatInterval *time.Duration - electionTimeout *time.Duration - raftHashicorp *bool - raftBootstrap *bool - telemetryUrl *string - telemetryEnabled *bool + defaultReplication *string + garbageThreshold *float64 + whiteList *string + disableHttp *bool + metricsAddress *string + metricsIntervalSec *int + raftResumeState *bool + metricsHttpPort *int + metricsHttpIp *string + heartbeatInterval *time.Duration + electionTimeout *time.Duration + raftHashicorp *bool + raftBootstrap *bool + telemetryUrl *string + telemetryEnabled *bool + intervalToCollectStats *time.Duration } func init() { @@ -93,6 +94,7 @@ func init() { m.raftBootstrap = cmdMaster.Flag.Bool("raftBootstrap", false, "Whether to bootstrap the Raft cluster") m.telemetryUrl = cmdMaster.Flag.String("telemetry.url", "https://telemetry.seaweedfs.com/api/collect", "telemetry server URL to send usage statistics") m.telemetryEnabled = cmdMaster.Flag.Bool("telemetry", false, "enable telemetry reporting") + m.intervalToCollectStats = cmdMaster.Flag.Duration("intervalToCollectStats", time.Second*120, "interval to collect s3 stats") } var cmdMaster = &Command{ @@ -229,6 +231,8 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { } go grpcS.Serve(grpcL) + rootCtx, rootCancel := context.WithCancel(context.Background()) + timeSleep := 1500 * time.Millisecond if !*masterOption.raftHashicorp { go func() { @@ -243,7 +247,8 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { }() } - go ms.MasterClient.KeepConnectedToMaster(context.Background()) + go ms.MasterClient.KeepConnectedToMaster(rootCtx) + statsShutdown := weed_server.CollectCollectionsStats(rootCtx, ms, *masterOption.intervalToCollectStats) // start http server var ( @@ -283,6 +288,15 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { grace.OnInterrupt(ms.Shutdown) grace.OnInterrupt(grpcS.Stop) + grace.OnInterrupt(func() { + rootCancel() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := statsShutdown(ctx); err != nil { + glog.Errorf("graceful shutdown of collections stats: %v", err) + } + }) grace.OnReload(func() { if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader { ms.Topo.HashicorpRaft.LeadershipTransfer() diff --git a/weed/command/server.go b/weed/command/server.go index f2e2e1b58..78274d95a 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -109,6 +109,7 @@ func init() { masterOptions.electionTimeout = cmdServer.Flag.Duration("master.electionTimeout", 10*time.Second, "election timeout of master servers") masterOptions.telemetryUrl = cmdServer.Flag.String("master.telemetry.url", "https://telemetry.seaweedfs.com/api/collect", "telemetry server URL to send usage statistics") masterOptions.telemetryEnabled = cmdServer.Flag.Bool("master.telemetry", false, "enable telemetry reporting") + masterOptions.intervalToCollectStats = cmdServer.Flag.Duration("master.intervalToCollectStats", time.Second*120, "interval to collect s3 stats") filerOptions.filerGroup = cmdServer.Flag.String("filer.filerGroup", "", "share metadata with other filers in the same filerGroup") filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection") diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 10b54d58f..ff2887ede 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -14,6 +14,7 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/stats" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/telemetry" "github.com/seaweedfs/seaweedfs/weed/cluster" @@ -34,6 +35,7 @@ import ( util_http "github.com/seaweedfs/seaweedfs/weed/util/http" "github.com/seaweedfs/seaweedfs/weed/util/version" "github.com/seaweedfs/seaweedfs/weed/wdclient" + "math" ) const ( @@ -437,3 +439,71 @@ func (ms *MasterServer) Reload() { util.StringSplit(v.GetString("guard.white_list"), ",")...), ) } + +func CollectCollectionsStats(parent context.Context, ms *MasterServer, interval time.Duration) (shutdown func(ctx context.Context) error) { + ctx, cancel := context.WithCancel(parent) + done := make(chan struct{}) + + go func() { + defer close(done) + ticker := time.NewTicker(interval) + defer ticker.Stop() + + req := &master_pb.VolumeListRequest{} + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + volumeList, err := ms.VolumeList(ctx, req) + if err != nil { + glog.Errorf("collect volume list: %v", err) + continue + } + if volumeList == nil || volumeList.TopologyInfo == nil { + continue + } + + collectionInfos := make(map[string]*shell.CollectionInfo) + for _, dc := range volumeList.TopologyInfo.DataCenterInfos { + for _, r := range dc.RackInfos { + for _, dn := range r.DataNodeInfos { + for _, diskInfo := range dn.DiskInfos { + for _, vif := range diskInfo.VolumeInfos { + c := vif.Collection + cif := collectionInfos[c] + if cif == nil { + cif = &shell.CollectionInfo{} + collectionInfos[c] = cif + } + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vif.ReplicaPlacement)) + copyCount := float64(replicaPlacement.GetCopyCount()) + cif.Size += float64(vif.Size) / copyCount + cif.DeleteCount += float64(vif.DeleteCount) / copyCount + cif.FileCount += float64(vif.FileCount) / copyCount + cif.DeletedByteCount += float64(vif.DeletedByteCount) / copyCount + cif.VolumeCount++ + } + } + } + } + } + + for bucket, v := range collectionInfos { + stats.S3BucketFileCount.WithLabelValues(bucket).Set(math.Round(v.FileCount - v.DeleteCount)) + stats.S3BucketSize.WithLabelValues(bucket).Set(math.Round(v.Size - v.DeletedByteCount)) + } + } + } + }() + + return func(ctx context.Context) error { + cancel() + select { + case <-done: + return nil + case <-ctx.Done(): + return ctx.Err() + } + } +} diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go index 2723e253f..1af98b51f 100644 --- a/weed/stats/metrics.go +++ b/weed/stats/metrics.go @@ -382,6 +382,22 @@ var ( Name: "uploaded_objects", Help: "Number of objects uploaded in each bucket.", }, []string{"bucket"}) + + S3BucketFileCount = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "s3", + Name: "bucket_file_count", + Help: "Number files in bucket", + }, []string{"bucket"}) + + S3BucketSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "s3", + Name: "bucket_size", + Help: "Bucket size in bytes", + }, []string{"bucket"}) ) func init() { @@ -433,6 +449,8 @@ func init() { Gather.MustRegister(S3BucketTrafficSentBytesCounter) Gather.MustRegister(S3DeletedObjectsCounter) Gather.MustRegister(S3UploadedObjectsCounter) + Gather.MustRegister(S3BucketFileCount) + Gather.MustRegister(S3BucketSize) go bucketMetricTTLControl() }