Browse Source

Mutex command output writes for `volume.check.disk`.

Prevents potential screen garbling when operations are parallelized
.Also simplifies logging by automatically adding newlines on output, if necessary.
pull/7605/head
Lisandro Pin 2 days ago
parent
commit
322e01a53e
Failed to extract signature
  1. 50
      weed/shell/command_volume_check_disk.go
  2. 6
      weed/shell/command_volume_check_disk_test.go

50
weed/shell/command_volume_check_disk.go

@ -10,6 +10,8 @@ import (
"math" "math"
"math/rand/v2" "math/rand/v2"
"net/http" "net/http"
"strings"
"sync"
"time" "time"
"slices" "slices"
@ -32,6 +34,7 @@ type commandVolumeCheckDisk struct{}
type volumeCheckDisk struct { type volumeCheckDisk struct {
commandEnv *CommandEnv commandEnv *CommandEnv
writer io.Writer writer io.Writer
writerMu sync.Mutex
now time.Time now time.Time
slowMode bool slowMode bool
@ -146,14 +149,14 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
// checkWritableVolumes fixes volume replicas which are not read-only. // checkWritableVolumes fixes volume replicas which are not read-only.
func (vcd *volumeCheckDisk) checkWritableVolumes(volumeReplicas map[uint32][]*VolumeReplica) error { func (vcd *volumeCheckDisk) checkWritableVolumes(volumeReplicas map[uint32][]*VolumeReplica) error {
vcd.write("Pass #1 (writable volumes)\n")
vcd.write("Pass #1 (writable volumes)")
for _, replicas := range volumeReplicas { for _, replicas := range volumeReplicas {
// filter readonly replica // filter readonly replica
var writableReplicas []*VolumeReplica var writableReplicas []*VolumeReplica
for _, replica := range replicas { for _, replica := range replicas {
if replica.info.ReadOnly { if replica.info.ReadOnly {
vcd.write("skipping readonly volume %d on %s\n", replica.info.Id, replica.location.dataNode.Id)
vcd.write("skipping readonly volume %d on %s", replica.info.Id, replica.location.dataNode.Id)
} else { } else {
writableReplicas = append(writableReplicas, replica) writableReplicas = append(writableReplicas, replica)
} }
@ -166,7 +169,7 @@ func (vcd *volumeCheckDisk) checkWritableVolumes(volumeReplicas map[uint32][]*Vo
a, b := writableReplicas[0], writableReplicas[1] a, b := writableReplicas[0], writableReplicas[1]
shouldSkip, err := vcd.shouldSkipVolume(a, b) shouldSkip, err := vcd.shouldSkipVolume(a, b)
if err != nil { if err != nil {
vcd.write("error checking if volume %d should be skipped: %v\n", a.info.Id, err)
vcd.write("error checking if volume %d should be skipped: %v", a.info.Id, err)
// Continue with sync despite error to be safe // Continue with sync despite error to be safe
} else if shouldSkip { } else if shouldSkip {
// always choose the larger volume to be the source // always choose the larger volume to be the source
@ -174,7 +177,7 @@ func (vcd *volumeCheckDisk) checkWritableVolumes(volumeReplicas map[uint32][]*Vo
continue continue
} }
if err := vcd.syncTwoReplicas(a, b, true); err != nil { if err := vcd.syncTwoReplicas(a, b, true); err != nil {
vcd.write("sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err)
vcd.write("sync volume %d on %s and %s: %v", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err)
} }
// always choose the larger volume to be the source // always choose the larger volume to be the source
if a.info.FileCount > b.info.FileCount { if a.info.FileCount > b.info.FileCount {
@ -204,7 +207,7 @@ func (vcd *volumeCheckDisk) makeVolumeWritable(vid uint32, vr *VolumeReplica) er
return err return err
} }
vcd.write("volume %d on %s is now writable\n", vid, vr.location.dataNode.Id)
vcd.write("volume %d on %s is now writable", vid, vr.location.dataNode.Id)
return nil return nil
} }
@ -224,7 +227,7 @@ func (vcd *volumeCheckDisk) makeVolumeReadonly(vid uint32, vr *VolumeReplica) er
return err return err
} }
vcd.write("volume %d on %s is now read-only\n", vid, vr.location.dataNode.Id)
vcd.write("volume %d on %s is now read-only", vid, vr.location.dataNode.Id)
return nil return nil
} }
@ -232,7 +235,7 @@ func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*Vo
if !vcd.fixReadOnly { if !vcd.fixReadOnly {
return nil return nil
} }
vcd.write("Pass #2 (read-only volumes)\n")
vcd.write("Pass #2 (read-only volumes)")
for vid, replicas := range volumeReplicas { for vid, replicas := range volumeReplicas {
roReplicas := []*VolumeReplica{} roReplicas := []*VolumeReplica{}
@ -246,11 +249,11 @@ func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*Vo
} }
} }
if len(roReplicas) == 0 { if len(roReplicas) == 0 {
vcd.write("no read-only replicas for volume %d\n", vid)
vcd.write("no read-only replicas for volume %d", vid)
continue continue
} }
if len(rwReplicas) == 0 { if len(rwReplicas) == 0 {
vcd.write("got %d read-only replicas for volume %d and no writable replicas to fix from\n", len(roReplicas), vid)
vcd.write("got %d read-only replicas for volume %d and no writable replicas to fix from", len(roReplicas), vid)
continue continue
} }
@ -261,7 +264,7 @@ func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*Vo
skip, err := vcd.shouldSkipVolume(r, source) skip, err := vcd.shouldSkipVolume(r, source)
if err != nil { if err != nil {
vcd.write("error checking if volume %d should be skipped: %v\n", r.info.Id, err)
vcd.write("error checking if volume %d should be skipped: %v", r.info.Id, err)
continue continue
} }
if skip { if skip {
@ -276,13 +279,13 @@ func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*Vo
// ...fix it... // ...fix it...
// TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes. // TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes.
if err := vcd.syncTwoReplicas(source, r, false); err != nil { if err := vcd.syncTwoReplicas(source, r, false); err != nil {
vcd.write("sync read-only volume %d on %s from %s: %v\n", vid, r.location.dataNode.Id, source.location.dataNode.Id, err)
vcd.write("sync read-only volume %d on %s from %s: %v", vid, r.location.dataNode.Id, source.location.dataNode.Id, err)
// ...or revert it back to read-only, if something went wrong. // ...or revert it back to read-only, if something went wrong.
if roErr := vcd.makeVolumeReadonly(vid, r); roErr != nil { if roErr := vcd.makeVolumeReadonly(vid, r); roErr != nil {
return fmt.Errorf("failed to make volume %d on %s readonly after: %v: %v", vid, r.location.dataNode.Id, err, roErr) return fmt.Errorf("failed to make volume %d on %s readonly after: %v: %v", vid, r.location.dataNode.Id, err, roErr)
} }
vcd.write("volume %d on %s is now read-only\n", vid, r.location.dataNode.Id)
vcd.write("volume %d on %s is now read-only", vid, r.location.dataNode.Id)
return err return err
} }
@ -297,12 +300,15 @@ func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption {
} }
func (vcd *volumeCheckDisk) write(format string, a ...any) { func (vcd *volumeCheckDisk) write(format string, a ...any) {
fmt.Fprintf(vcd.writer, format, a...)
vcd.writerMu.Lock()
defer vcd.writerMu.Unlock()
fmt.Fprintf(vcd.writer, strings.TrimRight(format, "\r\n "), a...)
fmt.Fprint(vcd.writer, "\n")
} }
func (vcd *volumeCheckDisk) writeVerbose(format string, a ...any) { func (vcd *volumeCheckDisk) writeVerbose(format string, a ...any) {
if vcd.verbose { if vcd.verbose {
fmt.Fprintf(vcd.writer, format, a...)
vcd.write(format, a...)
} }
} }
@ -388,7 +394,7 @@ func (vcd *volumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica) (bool, error)
if doSyncDeletedCount && !eqDeletedFileCount { if doSyncDeletedCount && !eqDeletedFileCount {
return false, nil return false, nil
} }
vcd.writeVerbose("skipping active volumes %d with the same file counts on %s and %s\n",
vcd.writeVerbose("skipping active volumes %d with the same file counts on %s and %s",
a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id) a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id)
} else { } else {
return false, nil return false, nil
@ -406,7 +412,7 @@ func (vcd *volumeCheckDisk) syncTwoReplicas(source, target *VolumeReplica, bidi
for (sourceHasChanges || targetHasChanges) && iteration < maxIterations { for (sourceHasChanges || targetHasChanges) && iteration < maxIterations {
iteration++ iteration++
vcd.writeVerbose("sync iteration %d/%d for volume %d\n", iteration, maxIterations, source.info.Id)
vcd.writeVerbose("sync iteration %d/%d for volume %d", iteration, maxIterations, source.info.Id)
prevSourceHasChanges, prevTargetHasChanges := sourceHasChanges, targetHasChanges prevSourceHasChanges, prevTargetHasChanges := sourceHasChanges, targetHasChanges
if sourceHasChanges, targetHasChanges, err = vcd.checkBoth(source, target, bidi); err != nil { if sourceHasChanges, targetHasChanges, err = vcd.checkBoth(source, target, bidi); err != nil {
@ -415,14 +421,14 @@ func (vcd *volumeCheckDisk) syncTwoReplicas(source, target *VolumeReplica, bidi
// Detect if we're stuck in a loop with no progress // Detect if we're stuck in a loop with no progress
if iteration > 1 && prevSourceHasChanges == sourceHasChanges && prevTargetHasChanges == targetHasChanges && (sourceHasChanges || targetHasChanges) { if iteration > 1 && prevSourceHasChanges == sourceHasChanges && prevTargetHasChanges == targetHasChanges && (sourceHasChanges || targetHasChanges) {
vcd.write("volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n",
vcd.write("volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop",
source.info.Id, source.location.dataNode.Id, target.location.dataNode.Id, iteration) source.info.Id, source.location.dataNode.Id, target.location.dataNode.Id, iteration)
return fmt.Errorf("sync not making progress after %d iterations", iteration) return fmt.Errorf("sync not making progress after %d iterations", iteration)
} }
} }
if iteration >= maxIterations && (sourceHasChanges || targetHasChanges) { if iteration >= maxIterations && (sourceHasChanges || targetHasChanges) {
vcd.write("volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention\n",
vcd.write("volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention",
source.info.Id, maxIterations, source.location.dataNode.Id, target.location.dataNode.Id) source.info.Id, maxIterations, source.location.dataNode.Id, target.location.dataNode.Id)
return fmt.Errorf("reached maximum sync iterations (%d)", maxIterations) return fmt.Errorf("reached maximum sync iterations (%d)", maxIterations)
} }
@ -512,7 +518,7 @@ func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.Me
return nil return nil
}) })
vcd.write("volume %d %s has %d entries, %s missed %d and partially deleted %d entries\n",
vcd.write("volume %d %s has %d entries, %s missed %d and partially deleted %d entries",
source.info.Id, source.location.dataNode.Id, counter, target.location.dataNode.Id, len(missingNeedles), len(partiallyDeletedNeedles)) source.info.Id, source.location.dataNode.Id, counter, target.location.dataNode.Id, len(missingNeedles), len(partiallyDeletedNeedles))
if counter == 0 || (len(missingNeedles) == 0 && len(partiallyDeletedNeedles) == 0) { if counter == 0 || (len(missingNeedles) == 0 && len(partiallyDeletedNeedles) == 0) {
@ -536,7 +542,7 @@ func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.Me
continue continue
} }
vcd.writeVerbose("read %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id)
vcd.writeVerbose("read %s %s => %s", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id)
hasChanges = true hasChanges = true
if err = vcd.writeNeedleBlobToTarget(pb.NewServerAddressFromDataNode(target.location.dataNode), source.info.Id, needleValue, needleBlob); err != nil { if err = vcd.writeNeedleBlobToTarget(pb.NewServerAddressFromDataNode(target.location.dataNode), source.info.Id, needleValue, needleBlob); err != nil {
@ -549,7 +555,7 @@ func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.Me
var fidList []string var fidList []string
for _, needleValue := range partiallyDeletedNeedles { for _, needleValue := range partiallyDeletedNeedles {
fidList = append(fidList, needleValue.Key.FileId(source.info.Id)) fidList = append(fidList, needleValue.Key.FileId(source.info.Id))
vcd.writeVerbose("delete %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id)
vcd.writeVerbose("delete %s %s => %s", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id)
} }
deleteResults := operation.DeleteFileIdsAtOneVolumeServer( deleteResults := operation.DeleteFileIdsAtOneVolumeServer(
pb.NewServerAddressFromDataNode(target.location.dataNode), pb.NewServerAddressFromDataNode(target.location.dataNode),
@ -604,7 +610,7 @@ func (vcd *volumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collection s
return err return err
} }
vcd.writeVerbose("load collection %s volume %d index size %d from %s ...\n", collection, volumeId, buf.Len(), volumeServer)
vcd.writeVerbose("load collection %s volume %d index size %d from %s ...", collection, volumeId, buf.Len(), volumeServer)
return db.LoadFilterFromReaderAt(bytes.NewReader(buf.Bytes()), true, false) return db.LoadFilterFromReaderAt(bytes.NewReader(buf.Bytes()), true, false)
} }

6
weed/shell/command_volume_check_disk_test.go

@ -278,14 +278,14 @@ func TestVolumeCheckDiskHelperMethods(t *testing.T) {
} }
// Test write method // Test write method
vcd.write("test %s\n", "message")
vcd.write("test %s", "message")
if buf.String() != "test message\n" { if buf.String() != "test message\n" {
t.Errorf("write() output = %q, want %q", buf.String(), "test message\n") t.Errorf("write() output = %q, want %q", buf.String(), "test message\n")
} }
// Test writeVerbose with verbose=true // Test writeVerbose with verbose=true
buf.Reset() buf.Reset()
vcd.writeVerbose("verbose %d\n", 123)
vcd.writeVerbose("verbose %d", 123)
if buf.String() != "verbose 123\n" { if buf.String() != "verbose 123\n" {
t.Errorf("writeVerbose() with verbose=true output = %q, want %q", buf.String(), "verbose 123\n") t.Errorf("writeVerbose() with verbose=true output = %q, want %q", buf.String(), "verbose 123\n")
} }
@ -293,7 +293,7 @@ func TestVolumeCheckDiskHelperMethods(t *testing.T) {
// Test writeVerbose with verbose=false // Test writeVerbose with verbose=false
buf.Reset() buf.Reset()
vcd.verbose = false vcd.verbose = false
vcd.writeVerbose("should not appear\n")
vcd.writeVerbose("should not appear")
if buf.String() != "" { if buf.String() != "" {
t.Errorf("writeVerbose() with verbose=false output = %q, want empty", buf.String()) t.Errorf("writeVerbose() with verbose=false output = %q, want empty", buf.String())
} }

Loading…
Cancel
Save