diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index 65d79da94..5ac5db5f4 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -69,14 +69,15 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. } defer dst.Close() - return doTraverseBfsAndSaving(commandEnv, writer, path, *verbose, func(outputChan chan []byte) { + return doTraverseBfsAndSaving(commandEnv, writer, path, *verbose, func(outputChan chan interface{}) { sizeBuf := make([]byte, 4) - for b := range outputChan { + for item := range outputChan { + b := item.([]byte) util.Uint32toBytes(sizeBuf, uint32(len(b))) dst.Write(sizeBuf) dst.Write(b) } - }, func(entry *filer_pb.FullEntry, outputChan chan []byte) (err error) { + }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { bytes, err := proto.Marshal(entry) if err != nil { fmt.Fprintf(writer, "marshall error: %v\n", err) @@ -87,41 +88,13 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. return nil }) - var chunksFileName = "" - if chunksFileName != "" { - - dst, openErr := os.OpenFile(chunksFileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) - if openErr != nil { - return fmt.Errorf("failed to create file %s: %v", chunksFileName, openErr) - } - defer dst.Close() - - return doTraverseBfsAndSaving(commandEnv, writer, path, *verbose, func(outputChan chan []byte) { - for b := range outputChan { - dst.Write(b) - } - }, func(entry *filer_pb.FullEntry, outputChan chan []byte) (err error) { - for _, chunk := range entry.Entry.Chunks { - dir := entry.Dir - if dir == "/" { - dir = "" - } - outputLine := fmt.Sprintf("%d\t%s\t%s/%s\n", chunk.Fid.FileKey, chunk.FileId, dir, entry.Entry.Name) - outputChan <- []byte(outputLine) - } - return nil - }) - } - - return err - } -func doTraverseBfsAndSaving(commandEnv *CommandEnv, writer io.Writer, path string, verbose bool, saveFn func(outputChan chan []byte), genFn func(entry *filer_pb.FullEntry, outputChan chan []byte) error) error { +func doTraverseBfsAndSaving(commandEnv *CommandEnv, writer io.Writer, path string, verbose bool, saveFn func(outputChan chan interface{}), genFn func(entry *filer_pb.FullEntry, outputChan chan interface{}) error) error { var wg sync.WaitGroup wg.Add(1) - outputChan := make(chan []byte, 1024) + outputChan := make(chan interface{}, 1024) go func() { saveFn(outputChan) wg.Done() @@ -157,7 +130,7 @@ func doTraverseBfsAndSaving(commandEnv *CommandEnv, writer io.Writer, path strin wg.Wait() - if err == nil { + if err == nil && writer != nil { fmt.Fprintf(writer, "total %d directories, %d files\n", dirCount, fileCount) } return err diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go new file mode 100644 index 000000000..d06782f3b --- /dev/null +++ b/weed/shell/command_volume_fsck.go @@ -0,0 +1,263 @@ +package shell + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "math" + "os" + "path/filepath" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle_map" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func init() { + Commands = append(Commands, &commandVolumeFsck{}) +} + +type commandVolumeFsck struct { + env *CommandEnv +} + +func (c *commandVolumeFsck) Name() string { + return "volume.fsck" +} + +func (c *commandVolumeFsck) Help() string { + return `check all volumes to find entries not used by the filer + + Important assumption!!! + the system is all used by one filer. + + This command works this way: + 1. collect all file ids from all volumes, as set A + 2. collect all file ids from the filer, as set B + 3. find out the set A subtract B + +` +} + +func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + c.env = commandEnv + + // collect all volume id locations + volumeIdToServer, err := c.collectVolumeIds() + if err != nil { + return fmt.Errorf("failed to collect all volume locations: %v", err) + } + + // create a temp folder + tempFolder, err := ioutil.TempDir("", "sw_fsck") + if err != nil { + return fmt.Errorf("failed to create temp folder: %v", err) + } + fmt.Fprintf(writer, "working directory: %s\n", tempFolder) + + // collect each volume file ids + for volumeId, vinfo := range volumeIdToServer { + err = c.collectOneVolumeFileIds(tempFolder, volumeId, vinfo) + if err != nil { + return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err) + } + } + + // collect all filer file ids + if err = c.collectFilerFileIds(tempFolder, volumeIdToServer); err != nil { + return fmt.Errorf("failed to collect file ids from filer: %v", err) + } + + // volume file ids substract filer file ids + var totalOrphanChunkCount, totalOrphanDataSize uint64 + for volumeId, server := range volumeIdToServer { + orphanChunkCount, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId) + if checkErr != nil { + return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, server, checkErr) + } + totalOrphanChunkCount += orphanChunkCount + totalOrphanDataSize += orphanDataSize + } + + if totalOrphanChunkCount > 0 { + fmt.Fprintf(writer, "total %d orphan chunks, %d bytes\n", totalOrphanChunkCount, totalOrphanDataSize) + } else { + fmt.Fprintf(writer, "no orphan data\n") + } + + return nil +} + +func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId uint32, vinfo VInfo) error { + + return operation.WithVolumeServerClient(vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + + copyFileClient, err := volumeServerClient.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ + VolumeId: volumeId, + Ext: ".idx", + CompactionRevision: math.MaxUint32, + StopOffset: math.MaxInt64, + Collection: vinfo.collection, + IsEcVolume: vinfo.isEcVolume, + IgnoreSourceFileNotFound: false, + }) + if err != nil { + return fmt.Errorf("failed to start copying volume %d.idx: %v", volumeId, err) + } + + err = writeToFile(copyFileClient, getVolumeFileIdFile(tempFolder, volumeId)) + if err != nil { + return fmt.Errorf("failed to copy %s.idx from %s: %v", volumeId, vinfo.server, err) + } + + return nil + + }) + +} + +func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToServer map[uint32]VInfo) error { + + files := make(map[uint32]*os.File) + for vid := range volumeIdToServer { + dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if openErr != nil { + return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr) + } + files[vid] = dst + } + defer func() { + for _, f := range files { + f.Close() + } + }() + + type Item struct { + vid uint32 + fileKey uint64 + } + return doTraverseBfsAndSaving(c.env, nil, "/", false, func(outputChan chan interface{}) { + buffer := make([]byte, 8) + for item := range outputChan { + i := item.(*Item) + util.Uint64toBytes(buffer, i.fileKey) + files[i.vid].Write(buffer) + } + }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { + for _, chunk := range entry.Entry.Chunks { + outputChan <- &Item{ + vid: chunk.Fid.VolumeId, + fileKey: chunk.Fid.FileKey, + } + } + return nil + }) +} + +func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder string, volumeId uint32) (orphanChunkCount, orphanDataSize uint64, err error) { + + db := needle_map.NewMemDb() + defer db.Close() + + if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, volumeId)); err != nil { + return + } + + filerFileIdsData, err := ioutil.ReadFile(getFilerFileIdFile(tempFolder, volumeId)) + if err != nil { + return + } + + dataLen := len(filerFileIdsData) + if dataLen%8 != 0 { + return 0, 0, fmt.Errorf("filer data is corrupted") + } + + for i := 0; i < len(filerFileIdsData); i += 8 { + fileKey := util.BytesToUint64(filerFileIdsData[i : i+8]) + db.Delete(types.NeedleId(fileKey)) + } + + db.AscendingVisit(func(n needle_map.NeedleValue) error { + fmt.Printf("%d,%x\n", volumeId, n.Key) + orphanChunkCount++ + orphanDataSize += uint64(n.Size) + return nil + }) + + return + +} + +type VInfo struct { + server string + collection string + isEcVolume bool +} + +func (c *commandVolumeFsck) collectVolumeIds() (volumeIdToServer map[uint32]VInfo, err error) { + + volumeIdToServer = make(map[uint32]VInfo) + var resp *master_pb.VolumeListResponse + err = c.env.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + return err + }) + if err != nil { + return + } + + eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) { + for _, vi := range t.VolumeInfos { + volumeIdToServer[vi.Id] = VInfo{ + server: t.Id, + collection: vi.Collection, + isEcVolume: false, + } + } + for _, ecShardInfo := range t.EcShardInfos { + volumeIdToServer[ecShardInfo.Id] = VInfo{ + server: t.Id, + collection: ecShardInfo.Collection, + isEcVolume: true, + } + } + }) + + return +} + +func getVolumeFileIdFile(tempFolder string, vid uint32) string { + return filepath.Join(tempFolder, fmt.Sprintf("%d.idx", vid)) +} + +func getFilerFileIdFile(tempFolder string, vid uint32) string { + return filepath.Join(tempFolder, fmt.Sprintf("%d.fid", vid)) +} + +func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error { + flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC + dst, err := os.OpenFile(fileName, flags, 0644) + if err != nil { + return nil + } + defer dst.Close() + + for { + resp, receiveErr := client.Recv() + if receiveErr == io.EOF { + break + } + if receiveErr != nil { + return fmt.Errorf("receiving %s: %v", fileName, receiveErr) + } + dst.Write(resp.FileContent) + } + return nil +}