Browse Source

Merge d2049820d4 into e9da64f62a

pull/7612/merge
Lisandro Pin 13 hours ago
committed by GitHub
parent
commit
10bd774fdb
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 55
      weed/shell/command_volume_check_disk.go

55
weed/shell/command_volume_check_disk.go

@ -64,9 +64,9 @@ func (c *commandVolumeCheckDisk) Help() string {
append entries in B and not in A to A
optionally, for each non-writable volume replica A
if volume is not full
select a writable volume replica B
if entries in A don't match B
prune late volume entries not matching its index file
select a writable volume replica B
append missing entries from B into A
mark the volume as writable (healthy)
@ -179,9 +179,16 @@ func (vcd *volumeCheckDisk) checkWritableVolumes(volumeReplicas map[uint32][]*Vo
writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...)
continue
}
if err := vcd.syncTwoReplicas(a, b, true); err != nil {
vcd.write("sync volume %d on %s and %s: %v", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err)
modified, err := vcd.syncTwoReplicas(a, b, true)
if err != nil {
vcd.write("failed to sync volumes %d on %s and %s: %v", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err)
} else {
if modified {
vcd.write("synced %s and %s for volume %d", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id)
}
}
// always choose the larger volume to be the source
if a.info.FileCount > b.info.FileCount {
writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...)
@ -280,19 +287,25 @@ func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*Vo
return err
}
// ...fix it...
// TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes.
if err := vcd.syncTwoReplicas(source, r, false); err != nil {
vcd.write("sync read-only volume %d on %s from %s: %v\n", vid, r.location.dataNode.Id, source.location.dataNode.Id, err)
// ...try to fix it...
// TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes...
modified, err := vcd.syncTwoReplicas(source, r, false)
if err != nil {
vcd.write("sync read-only volume %d on %s from %s: %v", vid, r.location.dataNode.Id, source.location.dataNode.Id, err)
// ...or revert it back to read-only, if something went wrong.
// TODO: we should keep unchanged volumes as read-only, so we don't modify valid volumes which are full.
if roErr := vcd.makeVolumeReadonly(vid, r); roErr != nil {
return fmt.Errorf("failed to make volume %d on %s readonly after: %v: %v", vid, r.location.dataNode.Id, err, roErr)
return fmt.Errorf("failed to revert volume %d on %s to readonly after: %v: %v", vid, r.location.dataNode.Id, err, roErr)
}
vcd.write("volume %d on %s is now read-only\n", vid, r.location.dataNode.Id)
return err
} else {
if modified {
vcd.write("volume %d on %s is now synced to %d and writable", vid, r.location.dataNode.Id, source.location.dataNode.Id)
} else {
// ...or restore back to read-only, if no changes were made.
if err := vcd.makeVolumeReadonly(vid, r); err != nil {
return fmt.Errorf("failed to revert volume %d on %s to readonly: %v", vid, r.location.dataNode.Id, err)
}
}
}
return nil
@ -411,35 +424,39 @@ func (vcd *volumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica) (bool, error)
// syncTwoReplicas attempts to sync all entries from a source volume replica into a target. If bi-directional mode
// is enabled, changes from target are also synced back into the source.
func (vcd *volumeCheckDisk) syncTwoReplicas(source, target *VolumeReplica, bidi bool) (err error) {
// Returns true if source and/or target were modified, false otherwise.
func (vcd *volumeCheckDisk) syncTwoReplicas(source, target *VolumeReplica, bidi bool) (modified bool, err error) {
sourceHasChanges, targetHasChanges := true, true
const maxIterations = 5
iteration := 0
modified = false
for (sourceHasChanges || targetHasChanges) && iteration < maxIterations {
iteration++
vcd.writeVerbose("sync iteration %d/%d for volume %d", iteration, maxIterations, source.info.Id)
prevSourceHasChanges, prevTargetHasChanges := sourceHasChanges, targetHasChanges
if sourceHasChanges, targetHasChanges, err = vcd.checkBoth(source, target, bidi); err != nil {
return err
return modified, err
}
modified = modified || sourceHasChanges || targetHasChanges
// Detect if we're stuck in a loop with no progress
if iteration > 1 && prevSourceHasChanges == sourceHasChanges && prevTargetHasChanges == targetHasChanges && (sourceHasChanges || targetHasChanges) {
vcd.write("volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop",
source.info.Id, source.location.dataNode.Id, target.location.dataNode.Id, iteration)
return fmt.Errorf("sync not making progress after %d iterations", iteration)
return modified, fmt.Errorf("sync not making progress after %d iterations", iteration)
}
}
if iteration >= maxIterations && (sourceHasChanges || targetHasChanges) {
vcd.write("volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention",
source.info.Id, maxIterations, source.location.dataNode.Id, target.location.dataNode.Id)
return fmt.Errorf("reached maximum sync iterations (%d)", maxIterations)
return modified, fmt.Errorf("reached maximum sync iterations (%d)", maxIterations)
}
return nil
return modified, nil
}
// checkBoth performs a sync between source and target volume replicas. If bi-directional mode is enabled, changes from target are also synced back into the source.
@ -628,7 +645,7 @@ func (vcd *volumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint
copyFileClient, err := volumeServerClient.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
VolumeId: volumeId,
Ext: ".idx",
Ext: ext,
CompactionRevision: math.MaxUint32,
StopOffset: math.MaxInt64,
Collection: collection,

Loading…
Cancel
Save