diff --git a/weed/shell/command_cluster_status.go b/weed/shell/command_cluster_status.go index 9bf7b1591..2f0de7703 100644 --- a/weed/shell/command_cluster_status.go +++ b/weed/shell/command_cluster_status.go @@ -214,13 +214,13 @@ func (sp *ClusterStatusPrinter) loadFileStats(commandEnv *CommandEnv) error { sp.regularVolumesStats = RegularVolumesStats{} sp.ecVolumesStats = EcVolumesStats{} - var mu sync.Mutex + var statsMu, writerMu sync.Mutex var progressTotal, progressDone uint64 ewg := NewErrorWaitGroup(sp.maxParallelization) updateProgress := func() { - mu.Lock() - defer mu.Unlock() + writerMu.Lock() + defer writerMu.Unlock() progressDone++ sp.write("collecting file stats: %s \r", sp.uint64Pct(progressDone, progressTotal)) @@ -230,13 +230,15 @@ func (sp *ClusterStatusPrinter) loadFileStats(commandEnv *CommandEnv) error { for _, ri := range dci.RackInfos { for _, dni := range ri.DataNodeInfos { for _, d := range dni.DiskInfos { - mu.Lock() + statsMu.Lock() progressTotal += uint64(len(d.VolumeInfos)) progressTotal += uint64(len(d.EcShardInfos)) - mu.Unlock() + statsMu.Unlock() for _, v := range d.VolumeInfos { ewg.Add(func() error { + defer updateProgress() + // Collect regular volume stats err := operation.WithVolumeServerClient(false, pb.NewServerAddressWithGrpcPort(dni.Id, int(dni.GrpcPort)), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, reqErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{ @@ -246,8 +248,8 @@ func (sp *ClusterStatusPrinter) loadFileStats(commandEnv *CommandEnv) error { return reqErr } - mu.Lock() - defer mu.Unlock() + statsMu.Lock() + defer statsMu.Unlock() if resp != nil { if _, ok := sp.regularVolumesStats[v.Id]; !ok { sp.regularVolumesStats[v.Id] = []*VolumeReplicaStats{} @@ -263,26 +265,24 @@ func (sp *ClusterStatusPrinter) loadFileStats(commandEnv *CommandEnv) error { return nil }) - updateProgress() return err }) } for _, eci := range d.EcShardInfos { ewg.Add(func() error { - // Collect EC shard stats - - var err error + defer updateProgress() - mu.Lock() + // Collect EC shard stats + statsMu.Lock() _, ok := sp.ecVolumesStats[eci.Id] - mu.Unlock() + statsMu.Unlock() if ok { // this EC volume has been already processed, likely on a different node return nil } - err = operation.WithVolumeServerClient(false, pb.NewServerAddressWithGrpcPort(dni.Id, int(dni.GrpcPort)), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, pb.NewServerAddressWithGrpcPort(dni.Id, int(dni.GrpcPort)), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, reqErr := volumeServerClient.VolumeEcShardsInfo(context.Background(), &volume_server_pb.VolumeEcShardsInfoRequest{ VolumeId: uint32(eci.Id), }) @@ -290,8 +290,8 @@ func (sp *ClusterStatusPrinter) loadFileStats(commandEnv *CommandEnv) error { return reqErr } - mu.Lock() - defer mu.Unlock() + statsMu.Lock() + defer statsMu.Unlock() if resp != nil { sp.ecVolumesStats[eci.Id] = &EcVolumeStats{ VolumeId: eci.Id, @@ -303,7 +303,6 @@ func (sp *ClusterStatusPrinter) loadFileStats(commandEnv *CommandEnv) error { return nil }) - updateProgress() return err }) }