Browse Source

metrics with generation

add-ec-vacuum
chrislu 4 months ago
parent
commit
3087da07db
  1. 23
      weed/stats/metrics.go
  2. 9
      weed/storage/erasure_coding/ec_shard.go
  3. 4
      weed/storage/store.go
  4. 18
      weed/storage/store_ec.go
  5. 2
      weed/storage/volume.go
  6. 2
      weed/storage/volume_loading.go
  7. 2
      weed/storage/volume_vacuum.go
  8. 66
      weed/topology/topology_ec.go
  9. 2
      weed/topology/topology_event_handling.go

23
weed/stats/metrics.go

@ -244,7 +244,7 @@ var (
Subsystem: "volumeServer",
Name: "volumes",
Help: "Number of volumes or shards.",
}, []string{"collection", "type"})
}, []string{"collection", "type", "generation"})
VolumeServerReadOnlyVolumeGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
@ -268,7 +268,7 @@ var (
Subsystem: "volumeServer",
Name: "total_disk_size",
Help: "Actual disk size used by volumes.",
}, []string{"collection", "type"})
}, []string{"collection", "type", "generation"})
VolumeServerResourceGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
@ -278,6 +278,23 @@ var (
Help: "Resource usage",
}, []string{"name", "type"})
// EC-specific generation metrics
MasterEcVolumeGenerationGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: "master",
Name: "ec_volume_generations",
Help: "Number of EC volumes by generation and activity status.",
}, []string{"collection", "generation", "active"})
MasterEcShardGenerationGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: "master",
Name: "ec_shard_generations",
Help: "Number of EC shards by generation and activity status.",
}, []string{"collection", "generation", "active"})
VolumeServerConcurrentDownloadLimit = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: Namespace,
@ -395,6 +412,8 @@ func init() {
Gather.MustRegister(MasterVolumeLayoutCrowded)
Gather.MustRegister(MasterPickForWriteErrorCounter)
Gather.MustRegister(MasterBroadcastToFullErrorCounter)
Gather.MustRegister(MasterEcVolumeGenerationGauge)
Gather.MustRegister(MasterEcShardGenerationGauge)
Gather.MustRegister(FilerRequestCounter)
Gather.MustRegister(FilerHandlerCounter)

9
weed/storage/erasure_coding/ec_shard.go

@ -23,11 +23,12 @@ type EcVolumeShard struct {
ecdFile *os.File
ecdFileSize int64
DiskType types.DiskType
Generation uint32 // generation for metrics labeling
}
func NewEcVolumeShard(diskType types.DiskType, dirname string, collection string, id needle.VolumeId, shardId ShardId, generation uint32) (v *EcVolumeShard, e error) {
v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId, DiskType: diskType}
v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId, DiskType: diskType, Generation: generation}
baseFileName := v.FileNameWithGeneration(generation)
@ -51,11 +52,13 @@ func NewEcVolumeShard(diskType types.DiskType, dirname string, collection string
}
func (shard *EcVolumeShard) Mount() {
stats.VolumeServerVolumeGauge.WithLabelValues(shard.Collection, "ec_shards").Inc()
generationLabel := fmt.Sprintf("%d", shard.Generation)
stats.VolumeServerVolumeGauge.WithLabelValues(shard.Collection, "ec_shards", generationLabel).Inc()
}
func (shard *EcVolumeShard) Unmount() {
stats.VolumeServerVolumeGauge.WithLabelValues(shard.Collection, "ec_shards").Dec()
generationLabel := fmt.Sprintf("%d", shard.Generation)
stats.VolumeServerVolumeGauge.WithLabelValues(shard.Collection, "ec_shards", generationLabel).Dec()
}
func (shard *EcVolumeShard) Size() int64 {

4
weed/storage/store.go

@ -359,11 +359,11 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
}
for col, size := range collectionVolumeSize {
stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal").Set(float64(size))
stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal", "0").Set(float64(size))
}
for col, deletedBytes := range collectionVolumeDeletedBytes {
stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "deleted_bytes").Set(float64(deletedBytes))
stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "deleted_bytes", "0").Set(float64(deletedBytes))
}
for col, types := range collectionVolumeReadOnlyCount {

18
weed/storage/store_ec.go

@ -24,21 +24,31 @@ import (
func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
var ecShardMessages []*master_pb.VolumeEcShardInformationMessage
collectionEcShardSize := make(map[string]int64)
// Track sizes by collection+generation combination
collectionGenerationEcShardSize := make(map[string]map[uint32]int64)
for diskId, location := range s.Locations {
location.ecVolumesLock.RLock()
for _, ecShards := range location.ecVolumes {
ecShardMessages = append(ecShardMessages, ecShards.ToVolumeEcShardInformationMessage(uint32(diskId))...)
// Initialize collection map if needed
if collectionGenerationEcShardSize[ecShards.Collection] == nil {
collectionGenerationEcShardSize[ecShards.Collection] = make(map[uint32]int64)
}
for _, ecShard := range ecShards.Shards {
collectionEcShardSize[ecShards.Collection] += ecShard.Size()
collectionGenerationEcShardSize[ecShards.Collection][ecShards.Generation] += ecShard.Size()
}
}
location.ecVolumesLock.RUnlock()
}
for col, size := range collectionEcShardSize {
stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "ec").Set(float64(size))
// Update metrics with generation labels
for col, generationSizes := range collectionGenerationEcShardSize {
for generation, size := range generationSizes {
generationLabel := fmt.Sprintf("%d", generation)
stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "ec", generationLabel).Set(float64(size))
}
}
return &master_pb.Heartbeat{

2
weed/storage/volume.go

@ -248,7 +248,7 @@ func (v *Volume) doClose() {
glog.Warningf("Volume Close fail to sync volume %d", v.Id)
}
v.DataBackend = nil
stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume").Dec()
stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume", "0").Dec()
}
}

2
weed/storage/volume_loading.go

@ -216,7 +216,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
}
}
stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume").Inc()
stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume", "0").Inc()
if err == nil {
hasLoadedVolume = true

2
weed/storage/volume_vacuum.go

@ -136,7 +136,7 @@ func (v *Volume) CommitCompact() error {
}
}
v.DataBackend = nil
stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume").Dec()
stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume", "0").Dec()
var e error
if e = v.makeupDiff(v.FileName(".cpd"), v.FileName(".cpx"), v.FileName(".dat"), v.FileName(".idx")); e != nil {

66
weed/topology/topology_ec.go

@ -6,6 +6,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
@ -348,6 +349,71 @@ func (t *Topology) LookupEcShardsWithFallback(vid needle.VolumeId, requestedGene
return nil, 0, false
}
// UpdateEcGenerationMetrics updates prometheus metrics with current EC volume generation information
func (t *Topology) UpdateEcGenerationMetrics() {
t.ecShardMapLock.RLock()
defer t.ecShardMapLock.RUnlock()
t.ecActiveGenerationMapLock.RLock()
defer t.ecActiveGenerationMapLock.RUnlock()
// Count volumes and shards by collection, generation, and active status
volumeCountsByCollection := make(map[string]map[uint32]map[bool]int)
shardCountsByCollection := make(map[string]map[uint32]map[bool]int)
// Initialize counting maps
for key, ecShardLocs := range t.ecShardMap {
collection := ecShardLocs.Collection
generation := key.Generation
if volumeCountsByCollection[collection] == nil {
volumeCountsByCollection[collection] = make(map[uint32]map[bool]int)
}
if volumeCountsByCollection[collection][generation] == nil {
volumeCountsByCollection[collection][generation] = make(map[bool]int)
}
if shardCountsByCollection[collection] == nil {
shardCountsByCollection[collection] = make(map[uint32]map[bool]int)
}
if shardCountsByCollection[collection][generation] == nil {
shardCountsByCollection[collection][generation] = make(map[bool]int)
}
// Check if this generation is active for this volume
activeGeneration, hasActiveGen := t.ecActiveGenerationMap[key.VolumeId]
isActive := hasActiveGen && activeGeneration == generation
// Count this volume
volumeCountsByCollection[collection][generation][isActive]++
// Count shards in this volume
shardCount := len(ecShardLocs.Locations)
shardCountsByCollection[collection][generation][isActive] += shardCount
}
// Update volume metrics
for collection, generationMap := range volumeCountsByCollection {
for generation, activeMap := range generationMap {
generationLabel := fmt.Sprintf("%d", generation)
for isActive, count := range activeMap {
activeLabel := fmt.Sprintf("%t", isActive)
stats.MasterEcVolumeGenerationGauge.WithLabelValues(collection, generationLabel, activeLabel).Set(float64(count))
}
}
}
// Update shard metrics
for collection, generationMap := range shardCountsByCollection {
for generation, activeMap := range generationMap {
generationLabel := fmt.Sprintf("%d", generation)
for isActive, count := range activeMap {
activeLabel := fmt.Sprintf("%t", isActive)
stats.MasterEcShardGenerationGauge.WithLabelValues(collection, generationLabel, activeLabel).Set(float64(count))
}
}
}
}
// ValidateEcGenerationReadiness checks if an EC generation has sufficient shards for activation
// Returns true if the generation has at least erasure_coding.DataShardsCount shards available
func (t *Topology) ValidateEcGenerationReadiness(vid needle.VolumeId, generation uint32) (ready bool, availableShards int, err error) {

2
weed/topology/topology_event_handling.go

@ -29,6 +29,8 @@ func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, g
if !t.isDisableVacuum {
t.Vacuum(grpcDialOption, garbageThreshold, concurrentVacuumLimitPerVolumeServer, 0, "", preallocate, true)
}
// Update EC generation metrics periodically
t.UpdateEcGenerationMetrics()
} else {
stats.MasterReplicaPlacementMismatch.Reset()
}

Loading…
Cancel
Save