Browse Source

Merge 7cfe6e20a5 into d745e6e41d

pull/7236/merge
Aleksey 1 month ago
committed by GitHub
parent
commit
fb3d1190ad
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 46
      weed/command/master.go
  2. 1
      weed/command/server.go
  3. 70
      weed/server/master_server.go
  4. 18
      weed/stats/metrics.go

46
weed/command/master.go

@ -49,21 +49,22 @@ type MasterOptions struct {
volumePreallocate *bool volumePreallocate *bool
maxParallelVacuumPerServer *int maxParallelVacuumPerServer *int
// pulseSeconds *int // pulseSeconds *int
defaultReplication *string
garbageThreshold *float64
whiteList *string
disableHttp *bool
metricsAddress *string
metricsIntervalSec *int
raftResumeState *bool
metricsHttpPort *int
metricsHttpIp *string
heartbeatInterval *time.Duration
electionTimeout *time.Duration
raftHashicorp *bool
raftBootstrap *bool
telemetryUrl *string
telemetryEnabled *bool
defaultReplication *string
garbageThreshold *float64
whiteList *string
disableHttp *bool
metricsAddress *string
metricsIntervalSec *int
raftResumeState *bool
metricsHttpPort *int
metricsHttpIp *string
heartbeatInterval *time.Duration
electionTimeout *time.Duration
raftHashicorp *bool
raftBootstrap *bool
telemetryUrl *string
telemetryEnabled *bool
intervalToCollectStats *time.Duration
} }
func init() { func init() {
@ -93,6 +94,7 @@ func init() {
m.raftBootstrap = cmdMaster.Flag.Bool("raftBootstrap", false, "Whether to bootstrap the Raft cluster") m.raftBootstrap = cmdMaster.Flag.Bool("raftBootstrap", false, "Whether to bootstrap the Raft cluster")
m.telemetryUrl = cmdMaster.Flag.String("telemetry.url", "https://telemetry.seaweedfs.com/api/collect", "telemetry server URL to send usage statistics") m.telemetryUrl = cmdMaster.Flag.String("telemetry.url", "https://telemetry.seaweedfs.com/api/collect", "telemetry server URL to send usage statistics")
m.telemetryEnabled = cmdMaster.Flag.Bool("telemetry", false, "enable telemetry reporting") m.telemetryEnabled = cmdMaster.Flag.Bool("telemetry", false, "enable telemetry reporting")
m.intervalToCollectStats = cmdMaster.Flag.Duration("intervalToCollectStats", time.Second*120, "interval to collect s3 stats")
} }
var cmdMaster = &Command{ var cmdMaster = &Command{
@ -229,6 +231,8 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
} }
go grpcS.Serve(grpcL) go grpcS.Serve(grpcL)
rootCtx, rootCancel := context.WithCancel(context.Background())
timeSleep := 1500 * time.Millisecond timeSleep := 1500 * time.Millisecond
if !*masterOption.raftHashicorp { if !*masterOption.raftHashicorp {
go func() { go func() {
@ -243,7 +247,8 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
}() }()
} }
go ms.MasterClient.KeepConnectedToMaster(context.Background())
go ms.MasterClient.KeepConnectedToMaster(rootCtx)
statsShutdown := weed_server.CollectCollectionsStats(rootCtx, ms, *masterOption.intervalToCollectStats)
// start http server // start http server
var ( var (
@ -283,6 +288,15 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
grace.OnInterrupt(ms.Shutdown) grace.OnInterrupt(ms.Shutdown)
grace.OnInterrupt(grpcS.Stop) grace.OnInterrupt(grpcS.Stop)
grace.OnInterrupt(func() {
rootCancel()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := statsShutdown(ctx); err != nil {
glog.Errorf("graceful shutdown of collections stats: %v", err)
}
})
grace.OnReload(func() { grace.OnReload(func() {
if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader { if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader {
ms.Topo.HashicorpRaft.LeadershipTransfer() ms.Topo.HashicorpRaft.LeadershipTransfer()

1
weed/command/server.go

@ -109,6 +109,7 @@ func init() {
masterOptions.electionTimeout = cmdServer.Flag.Duration("master.electionTimeout", 10*time.Second, "election timeout of master servers") masterOptions.electionTimeout = cmdServer.Flag.Duration("master.electionTimeout", 10*time.Second, "election timeout of master servers")
masterOptions.telemetryUrl = cmdServer.Flag.String("master.telemetry.url", "https://telemetry.seaweedfs.com/api/collect", "telemetry server URL to send usage statistics") masterOptions.telemetryUrl = cmdServer.Flag.String("master.telemetry.url", "https://telemetry.seaweedfs.com/api/collect", "telemetry server URL to send usage statistics")
masterOptions.telemetryEnabled = cmdServer.Flag.Bool("master.telemetry", false, "enable telemetry reporting") masterOptions.telemetryEnabled = cmdServer.Flag.Bool("master.telemetry", false, "enable telemetry reporting")
masterOptions.intervalToCollectStats = cmdServer.Flag.Duration("master.intervalToCollectStats", time.Second*120, "interval to collect s3 stats")
filerOptions.filerGroup = cmdServer.Flag.String("filer.filerGroup", "", "share metadata with other filers in the same filerGroup") filerOptions.filerGroup = cmdServer.Flag.String("filer.filerGroup", "", "share metadata with other filers in the same filerGroup")
filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection") filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection")

70
weed/server/master_server.go

@ -14,6 +14,7 @@ import (
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/telemetry" "github.com/seaweedfs/seaweedfs/weed/telemetry"
"github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/cluster"
@ -34,6 +35,7 @@ import (
util_http "github.com/seaweedfs/seaweedfs/weed/util/http" util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"github.com/seaweedfs/seaweedfs/weed/util/version" "github.com/seaweedfs/seaweedfs/weed/util/version"
"github.com/seaweedfs/seaweedfs/weed/wdclient" "github.com/seaweedfs/seaweedfs/weed/wdclient"
"math"
) )
const ( const (
@ -437,3 +439,71 @@ func (ms *MasterServer) Reload() {
util.StringSplit(v.GetString("guard.white_list"), ",")...), util.StringSplit(v.GetString("guard.white_list"), ",")...),
) )
} }
func CollectCollectionsStats(parent context.Context, ms *MasterServer, interval time.Duration) (shutdown func(ctx context.Context) error) {
ctx, cancel := context.WithCancel(parent)
done := make(chan struct{})
go func() {
defer close(done)
ticker := time.NewTicker(interval)
defer ticker.Stop()
req := &master_pb.VolumeListRequest{}
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
volumeList, err := ms.VolumeList(ctx, req)
if err != nil {
glog.Errorf("collect volume list: %v", err)
continue
}
if volumeList == nil || volumeList.TopologyInfo == nil {
continue
}
collectionInfos := make(map[string]*shell.CollectionInfo)
for _, dc := range volumeList.TopologyInfo.DataCenterInfos {
for _, r := range dc.RackInfos {
for _, dn := range r.DataNodeInfos {
for _, diskInfo := range dn.DiskInfos {
for _, vif := range diskInfo.VolumeInfos {
c := vif.Collection
cif := collectionInfos[c]
if cif == nil {
cif = &shell.CollectionInfo{}
collectionInfos[c] = cif
}
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vif.ReplicaPlacement))
copyCount := float64(replicaPlacement.GetCopyCount())
cif.Size += float64(vif.Size) / copyCount
cif.DeleteCount += float64(vif.DeleteCount) / copyCount
cif.FileCount += float64(vif.FileCount) / copyCount
cif.DeletedByteCount += float64(vif.DeletedByteCount) / copyCount
cif.VolumeCount++
}
}
}
}
}
for bucket, v := range collectionInfos {
stats.S3BucketFileCount.WithLabelValues(bucket).Set(math.Round(v.FileCount - v.DeleteCount))
stats.S3BucketSize.WithLabelValues(bucket).Set(math.Round(v.Size - v.DeletedByteCount))
}
}
}
}()
return func(ctx context.Context) error {
cancel()
select {
case <-done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
}

18
weed/stats/metrics.go

@ -382,6 +382,22 @@ var (
Name: "uploaded_objects", Name: "uploaded_objects",
Help: "Number of objects uploaded in each bucket.", Help: "Number of objects uploaded in each bucket.",
}, []string{"bucket"}) }, []string{"bucket"})
S3BucketFileCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: "s3",
Name: "bucket_file_count",
Help: "Number files in bucket",
}, []string{"bucket"})
S3BucketSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: "s3",
Name: "bucket_size",
Help: "Bucket size in bytes",
}, []string{"bucket"})
) )
func init() { func init() {
@ -433,6 +449,8 @@ func init() {
Gather.MustRegister(S3BucketTrafficSentBytesCounter) Gather.MustRegister(S3BucketTrafficSentBytesCounter)
Gather.MustRegister(S3DeletedObjectsCounter) Gather.MustRegister(S3DeletedObjectsCounter)
Gather.MustRegister(S3UploadedObjectsCounter) Gather.MustRegister(S3UploadedObjectsCounter)
Gather.MustRegister(S3BucketFileCount)
Gather.MustRegister(S3BucketSize)
go bucketMetricTTLControl() go bucketMetricTTLControl()
} }

Loading…
Cancel
Save