|
|
|
@ -183,67 +183,9 @@ func (vcd *volumeCheckDisk) checkWriteableVolumes(volumeReplicas map[uint32][]*V |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// checkReadOnlyVolumes fixes read-only volume replicas by performing uni-directional sync
|
|
|
|
// from a healthy source to read-only targets without modifying the source.
|
|
|
|
// checkReadOnlyVolumes fixes read-only volume replicas.
|
|
|
|
func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*VolumeReplica) error { |
|
|
|
for _, replicas := range volumeReplicas { |
|
|
|
// Filter for read-only replicas
|
|
|
|
var readOnlyReplicas []*VolumeReplica |
|
|
|
for _, replica := range replicas { |
|
|
|
if replica.info.ReadOnly { |
|
|
|
readOnlyReplicas = append(readOnlyReplicas, replica) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Skip if no read-only replicas
|
|
|
|
if len(readOnlyReplicas) == 0 { |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
// Find a writable source replica to sync from (prefer the one with highest file count)
|
|
|
|
var sourceReplica *VolumeReplica |
|
|
|
for _, replica := range replicas { |
|
|
|
if !replica.info.ReadOnly { |
|
|
|
if sourceReplica == nil || replica.info.FileCount > sourceReplica.info.FileCount { |
|
|
|
sourceReplica = replica |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// If no writable source found, use the read-only replica with highest file count as source
|
|
|
|
if sourceReplica == nil { |
|
|
|
vcd.writeVerbose("no writable source found for volume %d, using read-only replica with highest file count\n", readOnlyReplicas[0].info.Id) |
|
|
|
slices.SortFunc(readOnlyReplicas, func(a, b *VolumeReplica) int { |
|
|
|
return int(b.info.FileCount - a.info.FileCount) |
|
|
|
}) |
|
|
|
sourceReplica = readOnlyReplicas[0] |
|
|
|
readOnlyReplicas = readOnlyReplicas[1:] |
|
|
|
} |
|
|
|
|
|
|
|
// Perform uni-directional sync from source to each read-only target
|
|
|
|
for _, target := range readOnlyReplicas { |
|
|
|
if !vcd.slowMode { |
|
|
|
shouldSkip, err := vcd.shouldSkipVolume(sourceReplica, target) |
|
|
|
if err != nil { |
|
|
|
vcd.write("error checking if volume %d should be skipped: %v\n", sourceReplica.info.Id, err) |
|
|
|
// Continue with sync despite error to be safe
|
|
|
|
} else if shouldSkip { |
|
|
|
continue |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
vcd.write("syncing read-only volume %d from %s to %s (uni-directional)\n", |
|
|
|
sourceReplica.info.Id, sourceReplica.location.dataNode.Id, target.location.dataNode.Id) |
|
|
|
|
|
|
|
// Use uni-directional sync (bidi=false) to avoid modifying source
|
|
|
|
if err := vcd.syncTwoReplicas(sourceReplica, target, false); err != nil { |
|
|
|
vcd.write("sync read-only volume %d from %s to %s: %v\n", |
|
|
|
sourceReplica.info.Id, sourceReplica.location.dataNode.Id, target.location.dataNode.Id, err) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
return fmt.Errorf("not yet implemented (https://github.com/seaweedfs/seaweedfs/issues/7442)") |
|
|
|
} |
|
|
|
|
|
|
|
func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption { |
|
|
|
|