Browse Source

Fix file stat collection metric bug for the `cluster.status` command. (#8302)

When the `--files` flag is present, `cluster.status` will scrape file metrics
from volume servers to provide detailed stats on those. The progress indicator
was not being updated properly though, so the command would complete before
it read 100%.
pull/8323/head
Lisandro Pin 3 weeks ago
committed by GitHub
parent
commit
221bd237c4
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 33
      weed/shell/command_cluster_status.go

33
weed/shell/command_cluster_status.go

@ -214,13 +214,13 @@ func (sp *ClusterStatusPrinter) loadFileStats(commandEnv *CommandEnv) error {
sp.regularVolumesStats = RegularVolumesStats{}
sp.ecVolumesStats = EcVolumesStats{}
var mu sync.Mutex
var statsMu, writerMu sync.Mutex
var progressTotal, progressDone uint64
ewg := NewErrorWaitGroup(sp.maxParallelization)
updateProgress := func() {
mu.Lock()
defer mu.Unlock()
writerMu.Lock()
defer writerMu.Unlock()
progressDone++
sp.write("collecting file stats: %s \r", sp.uint64Pct(progressDone, progressTotal))
@ -230,13 +230,15 @@ func (sp *ClusterStatusPrinter) loadFileStats(commandEnv *CommandEnv) error {
for _, ri := range dci.RackInfos {
for _, dni := range ri.DataNodeInfos {
for _, d := range dni.DiskInfos {
mu.Lock()
statsMu.Lock()
progressTotal += uint64(len(d.VolumeInfos))
progressTotal += uint64(len(d.EcShardInfos))
mu.Unlock()
statsMu.Unlock()
for _, v := range d.VolumeInfos {
ewg.Add(func() error {
defer updateProgress()
// Collect regular volume stats
err := operation.WithVolumeServerClient(false, pb.NewServerAddressWithGrpcPort(dni.Id, int(dni.GrpcPort)), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, reqErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{
@ -246,8 +248,8 @@ func (sp *ClusterStatusPrinter) loadFileStats(commandEnv *CommandEnv) error {
return reqErr
}
mu.Lock()
defer mu.Unlock()
statsMu.Lock()
defer statsMu.Unlock()
if resp != nil {
if _, ok := sp.regularVolumesStats[v.Id]; !ok {
sp.regularVolumesStats[v.Id] = []*VolumeReplicaStats{}
@ -263,26 +265,24 @@ func (sp *ClusterStatusPrinter) loadFileStats(commandEnv *CommandEnv) error {
return nil
})
updateProgress()
return err
})
}
for _, eci := range d.EcShardInfos {
ewg.Add(func() error {
// Collect EC shard stats
var err error
defer updateProgress()
mu.Lock()
// Collect EC shard stats
statsMu.Lock()
_, ok := sp.ecVolumesStats[eci.Id]
mu.Unlock()
statsMu.Unlock()
if ok {
// this EC volume has been already processed, likely on a different node
return nil
}
err = operation.WithVolumeServerClient(false, pb.NewServerAddressWithGrpcPort(dni.Id, int(dni.GrpcPort)), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
err := operation.WithVolumeServerClient(false, pb.NewServerAddressWithGrpcPort(dni.Id, int(dni.GrpcPort)), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, reqErr := volumeServerClient.VolumeEcShardsInfo(context.Background(), &volume_server_pb.VolumeEcShardsInfoRequest{
VolumeId: uint32(eci.Id),
})
@ -290,8 +290,8 @@ func (sp *ClusterStatusPrinter) loadFileStats(commandEnv *CommandEnv) error {
return reqErr
}
mu.Lock()
defer mu.Unlock()
statsMu.Lock()
defer statsMu.Unlock()
if resp != nil {
sp.ecVolumesStats[eci.Id] = &EcVolumeStats{
VolumeId: eci.Id,
@ -303,7 +303,6 @@ func (sp *ClusterStatusPrinter) loadFileStats(commandEnv *CommandEnv) error {
return nil
})
updateProgress()
return err
})
}

Loading…
Cancel
Save