Browse Source

[stats]: added s3 bucket stats

pull/7236/head
ptukha 1 month ago
parent
commit
12a6a874e3
  1. 33
      weed/command/master.go
  2. 1
      weed/command/server.go
  3. 50
      weed/server/master_server.go
  4. 18
      weed/stats/metrics.go

33
weed/command/master.go

@ -49,21 +49,22 @@ type MasterOptions struct {
volumePreallocate *bool
maxParallelVacuumPerServer *int
// pulseSeconds *int
defaultReplication *string
garbageThreshold *float64
whiteList *string
disableHttp *bool
metricsAddress *string
metricsIntervalSec *int
raftResumeState *bool
metricsHttpPort *int
metricsHttpIp *string
heartbeatInterval *time.Duration
electionTimeout *time.Duration
raftHashicorp *bool
raftBootstrap *bool
telemetryUrl *string
telemetryEnabled *bool
defaultReplication *string
garbageThreshold *float64
whiteList *string
disableHttp *bool
metricsAddress *string
metricsIntervalSec *int
raftResumeState *bool
metricsHttpPort *int
metricsHttpIp *string
heartbeatInterval *time.Duration
electionTimeout *time.Duration
raftHashicorp *bool
raftBootstrap *bool
telemetryUrl *string
telemetryEnabled *bool
intervalToCollectStats *time.Duration
}
func init() {
@ -93,6 +94,7 @@ func init() {
m.raftBootstrap = cmdMaster.Flag.Bool("raftBootstrap", false, "Whether to bootstrap the Raft cluster")
m.telemetryUrl = cmdMaster.Flag.String("telemetry.url", "https://telemetry.seaweedfs.com/api/collect", "telemetry server URL to send usage statistics")
m.telemetryEnabled = cmdMaster.Flag.Bool("telemetry", false, "enable telemetry reporting")
m.intervalToCollectStats = cmdMaster.Flag.Duration("intervalToCollectStats", time.Second*120, "interval to collect s3 stats")
}
var cmdMaster = &Command{
@ -244,6 +246,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
}
go ms.MasterClient.KeepConnectedToMaster(context.Background())
go weed_server.CollectCollectionsStats(context.Background(), ms, *masterOption.intervalToCollectStats)
// start http server
var (

1
weed/command/server.go

@ -108,6 +108,7 @@ func init() {
masterOptions.electionTimeout = cmdServer.Flag.Duration("master.electionTimeout", 10*time.Second, "election timeout of master servers")
masterOptions.telemetryUrl = cmdServer.Flag.String("master.telemetry.url", "https://telemetry.seaweedfs.com/api/collect", "telemetry server URL to send usage statistics")
masterOptions.telemetryEnabled = cmdServer.Flag.Bool("master.telemetry", false, "enable telemetry reporting")
masterOptions.intervalToCollectStats = cmdServer.Flag.Duration("master.intervalToCollectStats", time.Second*120, "interval to collect s3 stats")
filerOptions.filerGroup = cmdServer.Flag.String("filer.filerGroup", "", "share metadata with other filers in the same filerGroup")
filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection")

50
weed/server/master_server.go

@ -14,6 +14,7 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/telemetry"
"github.com/seaweedfs/seaweedfs/weed/cluster"
@ -57,7 +58,7 @@ type MasterOption struct {
IsFollower bool
TelemetryUrl string
TelemetryEnabled bool
VolumeGrowthDisabled bool
VolumeGrowthDisabled bool
}
type MasterServer struct {
@ -434,3 +435,50 @@ func (ms *MasterServer) Reload() {
util.StringSplit(v.GetString("guard.white_list"), ",")...),
)
}
func CollectCollectionsStats(ctx context.Context, ms *MasterServer, intervalToCollect time.Duration) {
ticker := time.NewTicker(intervalToCollect)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
volumeList, err := ms.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
if err != nil {
glog.Errorf("collect volume list: %v", err)
}
collectionInfos := make(map[string]*shell.CollectionInfo)
for _, dc := range volumeList.TopologyInfo.DataCenterInfos {
for _, r := range dc.RackInfos {
for _, dn := range r.DataNodeInfos {
for _, diskInfo := range dn.DiskInfos {
for _, vif := range diskInfo.VolumeInfos {
c := vif.Collection
cif, found := collectionInfos[c]
if !found {
cif = &shell.CollectionInfo{}
collectionInfos[c] = cif
}
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vif.ReplicaPlacement))
copyCount := float64(replicaPlacement.GetCopyCount())
cif.Size += float64(vif.Size) / copyCount
cif.DeleteCount += float64(vif.DeleteCount) / copyCount
cif.FileCount += float64(vif.FileCount) / copyCount
cif.DeletedByteCount += float64(vif.DeletedByteCount) / copyCount
cif.VolumeCount++
}
}
}
}
}
for k, v := range collectionInfos {
stats.S3BucketFileCount.WithLabelValues(k).Set(v.FileCount - v.DeleteCount)
stats.S3BucketSize.WithLabelValues(k).Set(v.Size - v.DeletedByteCount)
}
}
}
}

18
weed/stats/metrics.go

@ -382,6 +382,22 @@ var (
Name: "uploaded_objects",
Help: "Number of objects uploaded in each bucket.",
}, []string{"bucket"})
S3BucketFileCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: "s3",
Name: "bucket_file_count",
Help: "Number files in bucket",
}, []string{"bucket"})
S3BucketSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: "s3",
Name: "bucket_size",
Help: "Bucket size in bytes",
}, []string{"bucket"})
)
func init() {
@ -433,6 +449,8 @@ func init() {
Gather.MustRegister(S3BucketTrafficSentBytesCounter)
Gather.MustRegister(S3DeletedObjectsCounter)
Gather.MustRegister(S3UploadedObjectsCounter)
Gather.MustRegister(S3BucketFileCount)
Gather.MustRegister(S3BucketSize)
go bucketMetricTTLControl()
}

Loading…
Cancel
Save