|
|
@ -13,6 +13,7 @@ import ( |
|
|
|
"google.golang.org/grpc" |
|
|
|
"io" |
|
|
|
"math" |
|
|
|
"net/http" |
|
|
|
"time" |
|
|
|
) |
|
|
|
|
|
|
@ -21,7 +22,8 @@ func init() { |
|
|
|
} |
|
|
|
|
|
|
|
type commandVolumeCheckDisk struct { |
|
|
|
env *CommandEnv |
|
|
|
env *CommandEnv |
|
|
|
syncDeletions *bool |
|
|
|
} |
|
|
|
|
|
|
|
func (c *commandVolumeCheckDisk) Name() string { |
|
|
@ -49,6 +51,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write |
|
|
|
verbose := fsckCommand.Bool("v", false, "verbose mode") |
|
|
|
volumeId := fsckCommand.Uint("volumeId", 0, "the volume id") |
|
|
|
applyChanges := fsckCommand.Bool("force", false, "apply the fix") |
|
|
|
c.syncDeletions = fsckCommand.Bool("syncDeleted", false, "sync of deletions the fix") |
|
|
|
nonRepairThreshold := fsckCommand.Float64("nonRepairThreshold", 0.3, "repair when missing keys is not more than this limit") |
|
|
|
if err = fsckCommand.Parse(args); err != nil { |
|
|
|
return nil |
|
|
@ -145,13 +148,17 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m |
|
|
|
// find missing keys
|
|
|
|
// hash join, can be more efficient
|
|
|
|
var missingNeedles []needle_map.NeedleValue |
|
|
|
var partiallyDeletedNeedles []needle_map.NeedleValue |
|
|
|
var counter int |
|
|
|
doCutoffOfLastNeedle := true |
|
|
|
minuend.DescendingVisit(func(value needle_map.NeedleValue) error { |
|
|
|
minuend.DescendingVisit(func(minuendValue needle_map.NeedleValue) error { |
|
|
|
counter++ |
|
|
|
if _, found := subtrahend.Get(value.Key); !found { |
|
|
|
if subtrahendValue, found := subtrahend.Get(minuendValue.Key); !found { |
|
|
|
if minuendValue.Size.IsDeleted() { |
|
|
|
return nil |
|
|
|
} |
|
|
|
if doCutoffOfLastNeedle { |
|
|
|
if needleMeta, err := readNeedleMeta(c.env.option.GrpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, value); err == nil { |
|
|
|
if needleMeta, err := readNeedleMeta(c.env.option.GrpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, minuendValue); err == nil { |
|
|
|
// needles older than the cutoff time are not missing yet
|
|
|
|
if needleMeta.AppendAtNs > cutoffFromAtNs { |
|
|
|
return nil |
|
|
@ -159,16 +166,22 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m |
|
|
|
doCutoffOfLastNeedle = false |
|
|
|
} |
|
|
|
} |
|
|
|
missingNeedles = append(missingNeedles, value) |
|
|
|
} else if doCutoffOfLastNeedle { |
|
|
|
doCutoffOfLastNeedle = false |
|
|
|
missingNeedles = append(missingNeedles, minuendValue) |
|
|
|
} else { |
|
|
|
if minuendValue.Size.IsDeleted() && !subtrahendValue.Size.IsDeleted() { |
|
|
|
partiallyDeletedNeedles = append(partiallyDeletedNeedles, minuendValue) |
|
|
|
} |
|
|
|
if doCutoffOfLastNeedle { |
|
|
|
doCutoffOfLastNeedle = false |
|
|
|
} |
|
|
|
} |
|
|
|
return nil |
|
|
|
}) |
|
|
|
|
|
|
|
fmt.Fprintf(writer, "volume %d %s has %d entries, %s missed %d entries\n", source.info.Id, source.location.dataNode.Id, counter, target.location.dataNode.Id, len(missingNeedles)) |
|
|
|
fmt.Fprintf(writer, "volume %d %s has %d entries, %s missed %d and partially deleted %d entries\n", |
|
|
|
source.info.Id, source.location.dataNode.Id, counter, target.location.dataNode.Id, len(missingNeedles), len(partiallyDeletedNeedles)) |
|
|
|
|
|
|
|
if counter == 0 || len(missingNeedles) == 0 { |
|
|
|
if counter == 0 || (len(missingNeedles) == 0 && len(partiallyDeletedNeedles) == 0) { |
|
|
|
return false, nil |
|
|
|
} |
|
|
|
|
|
|
@ -190,7 +203,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m |
|
|
|
} |
|
|
|
|
|
|
|
if verbose { |
|
|
|
fmt.Fprintf(writer, "read %d,%x %s => %s \n", source.info.Id, needleValue.Key, source.location.dataNode.Id, target.location.dataNode.Id) |
|
|
|
fmt.Fprintf(writer, "read %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) |
|
|
|
} |
|
|
|
|
|
|
|
hasChanges = true |
|
|
@ -201,6 +214,27 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if *c.syncDeletions && len(partiallyDeletedNeedles) > 0 { |
|
|
|
var fidList []string |
|
|
|
for _, needleValue := range partiallyDeletedNeedles { |
|
|
|
fidList = append(fidList, needleValue.Key.FileId(source.info.Id)) |
|
|
|
if verbose { |
|
|
|
fmt.Fprintf(writer, "delete %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) |
|
|
|
} |
|
|
|
} |
|
|
|
deleteResults, deleteErr := operation.DeleteFilesAtOneVolumeServer( |
|
|
|
pb.NewServerAddressFromDataNode(target.location.dataNode), |
|
|
|
c.env.option.GrpcDialOption, fidList, false) |
|
|
|
if deleteErr != nil { |
|
|
|
return hasChanges, deleteErr |
|
|
|
} |
|
|
|
for _, deleteResult := range deleteResults { |
|
|
|
if deleteResult.Status == http.StatusAccepted { |
|
|
|
hasChanges = true |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
@ -245,9 +279,7 @@ func (c *commandVolumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collect |
|
|
|
if verbose { |
|
|
|
fmt.Fprintf(writer, "load collection %s volume %d index size %d from %s ...\n", collection, volumeId, buf.Len(), volumeServer) |
|
|
|
} |
|
|
|
|
|
|
|
return db.LoadFromReaderAt(bytes.NewReader(buf.Bytes())) |
|
|
|
|
|
|
|
return db.LoadFilterFromReaderAt(bytes.NewReader(buf.Bytes()), true, false) |
|
|
|
} |
|
|
|
|
|
|
|
func (c *commandVolumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer, verbose bool, writer io.Writer) error { |
|
|
|