You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

272 lines
8.6 KiB

more solid weed mount (#4089) * compare chunks by timestamp * fix slab clearing error * fix test compilation * move oldest chunk to sealed, instead of by fullness * lock on fh.entryViewCache * remove verbose logs * revert slat clearing * less logs * less logs * track write and read by timestamp * remove useless logic * add entry lock on file handle release * use mem chunk only, swap file chunk has problems * comment out code that maybe used later * add debug mode to compare data read and write * more efficient readResolvedChunks with linked list * small optimization * fix test compilation * minor fix on writer * add SeparateGarbageChunks * group chunks into sections * turn off debug mode * fix tests * fix tests * tmp enable swap file chunk * Revert "tmp enable swap file chunk" This reverts commit 985137ec472924e4815f258189f6ca9f2168a0a7. * simple refactoring * simple refactoring * do not re-use swap file chunk. Sealed chunks should not be re-used. * comment out debugging facilities * either mem chunk or swap file chunk is fine now * remove orderedMutex as *semaphore.Weighted not found impactful * optimize size calculation for changing large files * optimize performance to avoid going through the long list of chunks * still problems with swap file chunk * rename * tiny optimization * swap file chunk save only successfully read data * fix * enable both mem and swap file chunk * resolve chunks with range * rename * fix chunk interval list * also change file handle chunk group when adding chunks * pick in-active chunk with time-decayed counter * fix compilation * avoid nil with empty fh.entry * refactoring * rename * rename * refactor visible intervals to *list.List * refactor chunkViews to *list.List * add IntervalList for generic interval list * change visible interval to use IntervalList in generics * cahnge chunkViews to *IntervalList[*ChunkView] * use NewFileChunkSection to create * rename variables * refactor * fix renaming leftover * renaming * renaming * add insert interval * interval list adds lock * incrementally add chunks to readers Fixes: 1. set start and stop offset for the value object 2. clone the value object 3. use pointer instead of copy-by-value when passing to interval.Value 4. use insert interval since adding chunk could be out of order * fix tests compilation * fix tests compilation
2 years ago
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/filer"
  7. "github.com/seaweedfs/seaweedfs/weed/operation"
  8. "github.com/seaweedfs/seaweedfs/weed/pb"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/storage"
  13. "github.com/seaweedfs/seaweedfs/weed/util"
  14. "go.uber.org/atomic"
  15. "golang.org/x/exp/slices"
  16. "io"
  17. "math"
  18. "strings"
  19. "sync"
  20. "time"
  21. )
  22. func init() {
  23. Commands = append(Commands, &commandFsVerify{})
  24. }
  25. type commandFsVerify struct {
  26. env *CommandEnv
  27. volumeServers []pb.ServerAddress
  28. volumeIds map[uint32][]pb.ServerAddress
  29. verbose *bool
  30. metadataFromLog *bool
  31. concurrency *int
  32. modifyTimeAgoAtSec int64
  33. writer io.Writer
  34. waitChan map[string]chan struct{}
  35. waitChanLock sync.RWMutex
  36. }
  37. func (c *commandFsVerify) Name() string {
  38. return "fs.verify"
  39. }
  40. func (c *commandFsVerify) Help() string {
  41. return `recursively verify all files under a directory
  42. fs.verify [-v] [-modifyTimeAgo 1h] /buckets/dir
  43. `
  44. }
  45. func (c *commandFsVerify) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  46. c.env = commandEnv
  47. c.writer = writer
  48. fsVerifyCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  49. c.verbose = fsVerifyCommand.Bool("v", false, "print out each processed files")
  50. modifyTimeAgo := fsVerifyCommand.Duration("modifyTimeAgo", 0, "only include files after this modify time to verify")
  51. c.concurrency = fsVerifyCommand.Int("concurrency", 0, "number of parallel verification per volume server")
  52. c.metadataFromLog = fsVerifyCommand.Bool("metadataFromLog", false, "Using filer log to get metadata")
  53. if err = fsVerifyCommand.Parse(args); err != nil {
  54. return err
  55. }
  56. path, parseErr := commandEnv.parseUrl(findInputDirectory(fsVerifyCommand.Args()))
  57. if parseErr != nil {
  58. return parseErr
  59. }
  60. c.modifyTimeAgoAtSec = int64(modifyTimeAgo.Seconds())
  61. c.volumeIds = make(map[uint32][]pb.ServerAddress)
  62. c.waitChan = make(map[string]chan struct{})
  63. c.volumeServers = []pb.ServerAddress{}
  64. defer func() {
  65. c.modifyTimeAgoAtSec = 0
  66. c.volumeIds = nil
  67. c.waitChan = nil
  68. c.volumeServers = nil
  69. }()
  70. if err := c.collectVolumeIds(); err != nil {
  71. return parseErr
  72. }
  73. if *c.concurrency > 0 {
  74. for _, volumeServer := range c.volumeServers {
  75. volumeServerStr := string(volumeServer)
  76. c.waitChan[volumeServerStr] = make(chan struct{}, *c.concurrency)
  77. defer close(c.waitChan[volumeServerStr])
  78. }
  79. }
  80. var fCount, eConut uint64
  81. if *c.metadataFromLog {
  82. itemErrCount := atomic.NewUint64(0)
  83. var wg sync.WaitGroup
  84. fCount, err = c.verifyProcessMetadata(path, itemErrCount, &wg)
  85. wg.Wait()
  86. eConut = itemErrCount.Load()
  87. } else {
  88. fCount, eConut, err = c.verifyTraverseBfs(path)
  89. }
  90. fmt.Fprintf(writer, "verified %d files, error %d files \n", fCount, eConut)
  91. return err
  92. }
  93. func (c *commandFsVerify) collectVolumeIds() error {
  94. topologyInfo, _, err := collectTopologyInfo(c.env, 0)
  95. if err != nil {
  96. return err
  97. }
  98. eachDataNode(topologyInfo, func(dc string, rack RackId, nodeInfo *master_pb.DataNodeInfo) {
  99. for _, diskInfo := range nodeInfo.DiskInfos {
  100. for _, vi := range diskInfo.VolumeInfos {
  101. volumeServer := pb.NewServerAddressFromDataNode(nodeInfo)
  102. c.volumeIds[vi.Id] = append(c.volumeIds[vi.Id], volumeServer)
  103. if !slices.Contains(c.volumeServers, volumeServer) {
  104. c.volumeServers = append(c.volumeServers, volumeServer)
  105. }
  106. }
  107. }
  108. })
  109. return nil
  110. }
  111. func (c *commandFsVerify) verifyChunk(volumeServer pb.ServerAddress, fileId *filer_pb.FileId) error {
  112. err := operation.WithVolumeServerClient(false, volumeServer, c.env.option.GrpcDialOption,
  113. func(client volume_server_pb.VolumeServerClient) error {
  114. _, err := client.VolumeNeedleStatus(context.Background(),
  115. &volume_server_pb.VolumeNeedleStatusRequest{
  116. VolumeId: fileId.VolumeId,
  117. NeedleId: fileId.FileKey})
  118. return err
  119. },
  120. )
  121. if err != nil && !strings.Contains(err.Error(), storage.ErrorDeleted.Error()) {
  122. return err
  123. }
  124. return nil
  125. }
  126. type ItemEntry struct {
  127. chunks []*filer_pb.FileChunk
  128. path util.FullPath
  129. }
  130. func (c *commandFsVerify) verifyProcessMetadata(path string, errorCount *atomic.Uint64, wg *sync.WaitGroup) (fileCount uint64, err error) {
  131. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  132. message := resp.EventNotification
  133. if resp.EventNotification.NewEntry == nil || len(message.NewEntry.Chunks) == 0 {
  134. return nil
  135. }
  136. entryPath := fmt.Sprintf("%s/%s", message.NewParentPath, message.NewEntry.Name)
  137. if c.verifyEntry(entryPath, message.NewEntry.Chunks, errorCount, wg) {
  138. if *c.verbose {
  139. fmt.Fprintf(c.writer, "file: %s needles:%d verifed\n", entryPath, len(message.NewEntry.Chunks))
  140. }
  141. fileCount++
  142. }
  143. return nil
  144. }
  145. metadataFollowOption := &pb.MetadataFollowOption{
  146. ClientName: "shell_verify",
  147. ClientId: util.RandomInt32(),
  148. ClientEpoch: 0,
  149. SelfSignature: 0,
  150. PathPrefix: path,
  151. AdditionalPathPrefixes: nil,
  152. DirectoriesToWatch: nil,
  153. StartTsNs: time.Now().Add(-1 * time.Second * time.Duration(c.modifyTimeAgoAtSec)).UnixNano(),
  154. StopTsNs: time.Now().UnixNano(),
  155. EventErrorType: pb.TrivialOnError,
  156. }
  157. return fileCount, pb.FollowMetadata(c.env.option.FilerAddress, c.env.option.GrpcDialOption, metadataFollowOption, processEventFn)
  158. }
  159. func (c *commandFsVerify) verifyEntry(path string, chunks []*filer_pb.FileChunk, errorCount *atomic.Uint64, wg *sync.WaitGroup) bool {
  160. fileMsg := fmt.Sprintf("file:%s", path)
  161. itemIsVerifed := atomic.NewBool(true)
  162. for _, chunk := range chunks {
  163. if volumeIds, ok := c.volumeIds[chunk.Fid.VolumeId]; ok {
  164. for _, volumeServer := range volumeIds {
  165. if *c.concurrency == 0 {
  166. if err := c.verifyChunk(volumeServer, chunk.Fid); err != nil {
  167. fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
  168. fileMsg, chunk.GetFileIdString(), err)
  169. if itemIsVerifed.Load() {
  170. itemIsVerifed.Store(false)
  171. errorCount.Add(1)
  172. }
  173. }
  174. continue
  175. }
  176. c.waitChanLock.RLock()
  177. waitChan, ok := c.waitChan[string(volumeServer)]
  178. c.waitChanLock.RUnlock()
  179. if !ok {
  180. fmt.Fprintf(c.writer, "%s failed to get channel for %s fileId: %s\n",
  181. string(volumeServer), fileMsg, chunk.GetFileIdString())
  182. if itemIsVerifed.Load() {
  183. itemIsVerifed.Store(false)
  184. errorCount.Add(1)
  185. }
  186. continue
  187. }
  188. wg.Add(1)
  189. waitChan <- struct{}{}
  190. go func(fChunk *filer_pb.FileChunk, path string, volumeServer pb.ServerAddress, msg string) {
  191. defer wg.Done()
  192. if err := c.verifyChunk(volumeServer, fChunk.Fid); err != nil {
  193. fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
  194. msg, fChunk.GetFileIdString(), err)
  195. if itemIsVerifed.Load() {
  196. itemIsVerifed.Store(false)
  197. errorCount.Add(1)
  198. }
  199. }
  200. <-waitChan
  201. }(chunk, path, volumeServer, fileMsg)
  202. }
  203. } else {
  204. err := fmt.Errorf("volumeId %d not found", chunk.Fid.VolumeId)
  205. fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
  206. fileMsg, chunk.GetFileIdString(), err)
  207. if itemIsVerifed.Load() {
  208. itemIsVerifed.Store(false)
  209. errorCount.Add(1)
  210. }
  211. break
  212. }
  213. }
  214. return itemIsVerifed.Load()
  215. }
  216. func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errCount uint64, err error) {
  217. timeNowAtSec := time.Now().Unix()
  218. return fileCount, errCount, doTraverseBfsAndSaving(c.env, c.writer, path, false,
  219. func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
  220. if c.modifyTimeAgoAtSec > 0 {
  221. if entry.Entry.Attributes != nil && c.modifyTimeAgoAtSec < timeNowAtSec-entry.Entry.Attributes.Mtime {
  222. return nil
  223. }
  224. }
  225. dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.GetChunks(), 0, math.MaxInt64)
  226. if resolveErr != nil {
  227. return fmt.Errorf("failed to ResolveChunkManifest: %+v", resolveErr)
  228. }
  229. dataChunks = append(dataChunks, manifestChunks...)
  230. if len(dataChunks) > 0 {
  231. outputChan <- &ItemEntry{
  232. chunks: dataChunks,
  233. path: util.NewFullPath(entry.Dir, entry.Entry.Name),
  234. }
  235. }
  236. return nil
  237. },
  238. func(outputChan chan interface{}) {
  239. var wg sync.WaitGroup
  240. itemErrCount := atomic.NewUint64(0)
  241. for itemEntry := range outputChan {
  242. i := itemEntry.(*ItemEntry)
  243. itemPath := string(i.path)
  244. if c.verifyEntry(itemPath, i.chunks, itemErrCount, &wg) {
  245. if *c.verbose {
  246. fmt.Fprintf(c.writer, "file: %s needles:%d verifed\n", itemPath, len(i.chunks))
  247. }
  248. fileCount++
  249. }
  250. }
  251. wg.Wait()
  252. errCount = itemErrCount.Load()
  253. })
  254. }