You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

383 lines
14 KiB

3 years ago
4 years ago
4 years ago
4 years ago
4 years ago
6 months ago
  1. package shell
  2. import (
  3. "bytes"
  4. "context"
  5. "flag"
  6. "fmt"
  7. "github.com/seaweedfs/seaweedfs/weed/operation"
  8. "github.com/seaweedfs/seaweedfs/weed/pb"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/server/constants"
  12. "github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
  13. "golang.org/x/exp/slices"
  14. "google.golang.org/grpc"
  15. "io"
  16. "math"
  17. "net/http"
  18. "sync"
  19. "time"
  20. )
  21. func init() {
  22. Commands = append(Commands, &commandVolumeCheckDisk{})
  23. }
  24. type commandVolumeCheckDisk struct {
  25. env *CommandEnv
  26. writer io.Writer
  27. }
  28. func (c *commandVolumeCheckDisk) Name() string {
  29. return "volume.check.disk"
  30. }
  31. func (c *commandVolumeCheckDisk) Help() string {
  32. return `check all replicated volumes to find and fix inconsistencies. It is optional and resource intensive.
  33. How it works:
  34. find all volumes that are replicated
  35. for each volume id, if there are more than 2 replicas, find one pair with the largest 2 in file count.
  36. for the pair volume A and B
  37. append entries in A and not in B to B
  38. append entries in B and not in A to A
  39. `
  40. }
  41. func (c *commandVolumeCheckDisk) getVolumeStatusFileCount(vid uint32, dn *master_pb.DataNodeInfo) (totalFileCount, deletedFileCount uint64) {
  42. err := operation.WithVolumeServerClient(false, pb.NewServerAddressWithGrpcPort(dn.Id, int(dn.GrpcPort)), c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  43. resp, reqErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{
  44. VolumeId: uint32(vid),
  45. })
  46. if resp != nil {
  47. totalFileCount = resp.FileCount
  48. deletedFileCount = resp.FileDeletedCount
  49. }
  50. return reqErr
  51. })
  52. if err != nil {
  53. fmt.Fprintf(c.writer, "getting number of files for volume id %d from volumes status: %+v\n", vid, err)
  54. }
  55. return totalFileCount, deletedFileCount
  56. }
  57. func (c *commandVolumeCheckDisk) eqVolumeFileCount(a, b *VolumeReplica) (bool, bool) {
  58. var waitGroup sync.WaitGroup
  59. var fileCountA, fileCountB, fileDeletedCountA, fileDeletedCountB uint64
  60. waitGroup.Add(1)
  61. go func() {
  62. defer waitGroup.Done()
  63. fileCountA, fileDeletedCountA = c.getVolumeStatusFileCount(a.info.Id, a.location.dataNode)
  64. }()
  65. waitGroup.Add(1)
  66. go func() {
  67. defer waitGroup.Done()
  68. fileCountB, fileDeletedCountB = c.getVolumeStatusFileCount(b.info.Id, b.location.dataNode)
  69. }()
  70. // Trying to synchronize a remote call to two nodes
  71. waitGroup.Wait()
  72. return fileCountA == fileCountB, fileDeletedCountA == fileDeletedCountB
  73. }
  74. func (c *commandVolumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica, pulseTimeAtSecond int64, syncDeletions, verbose bool) bool {
  75. doSyncDeletedCount := false
  76. if syncDeletions && a.info.DeleteCount != b.info.DeleteCount {
  77. doSyncDeletedCount = true
  78. }
  79. if (a.info.FileCount != b.info.FileCount) || doSyncDeletedCount {
  80. // Do synchronization of volumes, if the modification time was before the last pulsation time
  81. if a.info.ModifiedAtSecond < pulseTimeAtSecond || b.info.ModifiedAtSecond < pulseTimeAtSecond {
  82. return false
  83. }
  84. if eqFileCount, eqDeletedFileCount := c.eqVolumeFileCount(a, b); eqFileCount {
  85. if doSyncDeletedCount && !eqDeletedFileCount {
  86. return false
  87. }
  88. if verbose {
  89. fmt.Fprintf(c.writer, "skipping active volumes %d with the same file counts on %s and %s\n",
  90. a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id)
  91. }
  92. } else {
  93. return false
  94. }
  95. }
  96. return true
  97. }
  98. func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  99. fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  100. slowMode := fsckCommand.Bool("slow", false, "slow mode checks all replicas even file counts are the same")
  101. verbose := fsckCommand.Bool("v", false, "verbose mode")
  102. volumeId := fsckCommand.Uint("volumeId", 0, "the volume id")
  103. applyChanges := fsckCommand.Bool("force", false, "apply the fix")
  104. syncDeletions := fsckCommand.Bool("syncDeleted", false, "sync of deletions the fix")
  105. nonRepairThreshold := fsckCommand.Float64("nonRepairThreshold", 0.3, "repair when missing keys is not more than this limit")
  106. if err = fsckCommand.Parse(args); err != nil {
  107. return nil
  108. }
  109. infoAboutSimulationMode(writer, *applyChanges, "-force")
  110. if err = commandEnv.confirmIsLocked(args); err != nil {
  111. return
  112. }
  113. c.env = commandEnv
  114. c.writer = writer
  115. // collect topology information
  116. pulseTimeAtSecond := time.Now().Unix() - constants.VolumePulseSeconds*2
  117. topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
  118. if err != nil {
  119. return err
  120. }
  121. volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo)
  122. // pick 1 pairs of volume replica
  123. for _, replicas := range volumeReplicas {
  124. if *volumeId > 0 && replicas[0].info.Id != uint32(*volumeId) {
  125. continue
  126. }
  127. slices.SortFunc(replicas, func(a, b *VolumeReplica) int {
  128. return int(b.info.FileCount - a.info.FileCount)
  129. })
  130. for len(replicas) >= 2 {
  131. a, b := replicas[0], replicas[1]
  132. replicas = replicas[1:]
  133. if a.info.ReadOnly || b.info.ReadOnly {
  134. fmt.Fprintf(writer, "skipping readonly volume %d on %s and %s\n",
  135. a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id)
  136. continue
  137. }
  138. if !*slowMode && c.shouldSkipVolume(a, b, pulseTimeAtSecond, *syncDeletions, *verbose) {
  139. continue
  140. }
  141. if err := c.syncTwoReplicas(a, b, *applyChanges, *syncDeletions, *nonRepairThreshold, *verbose); err != nil {
  142. fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err)
  143. }
  144. }
  145. }
  146. return nil
  147. }
  148. func (c *commandVolumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool) (err error) {
  149. aHasChanges, bHasChanges := true, true
  150. for aHasChanges || bHasChanges {
  151. if aHasChanges, bHasChanges, err = c.checkBoth(a, b, applyChanges, doSyncDeletions, nonRepairThreshold, verbose); err != nil {
  152. return err
  153. }
  154. }
  155. return nil
  156. }
  157. func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool) (aHasChanges bool, bHasChanges bool, err error) {
  158. aDB, bDB := needle_map.NewMemDb(), needle_map.NewMemDb()
  159. defer func() {
  160. aDB.Close()
  161. bDB.Close()
  162. }()
  163. // read index db
  164. readIndexDbCutoffFrom := uint64(time.Now().UnixNano())
  165. if err = readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode), verbose, c.writer, c.env.option.GrpcDialOption); err != nil {
  166. return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", a.location.dataNode, a.info.Id, err)
  167. }
  168. if err := readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode), verbose, c.writer, c.env.option.GrpcDialOption); err != nil {
  169. return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", b.location.dataNode, b.info.Id, err)
  170. }
  171. // find and make up the differences
  172. if aHasChanges, err = doVolumeCheckDisk(bDB, aDB, b, a, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil {
  173. return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err)
  174. }
  175. if bHasChanges, err = doVolumeCheckDisk(aDB, bDB, a, b, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil {
  176. return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err)
  177. }
  178. return
  179. }
  180. func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *VolumeReplica, verbose bool, writer io.Writer, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, cutoffFromAtNs uint64, grpcDialOption grpc.DialOption) (hasChanges bool, err error) {
  181. // find missing keys
  182. // hash join, can be more efficient
  183. var missingNeedles []needle_map.NeedleValue
  184. var partiallyDeletedNeedles []needle_map.NeedleValue
  185. var counter int
  186. doCutoffOfLastNeedle := true
  187. minuend.DescendingVisit(func(minuendValue needle_map.NeedleValue) error {
  188. counter++
  189. if subtrahendValue, found := subtrahend.Get(minuendValue.Key); !found {
  190. if minuendValue.Size.IsDeleted() {
  191. return nil
  192. }
  193. if doCutoffOfLastNeedle {
  194. if needleMeta, err := readNeedleMeta(grpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, minuendValue); err == nil {
  195. // needles older than the cutoff time are not missing yet
  196. if needleMeta.AppendAtNs > cutoffFromAtNs {
  197. return nil
  198. }
  199. doCutoffOfLastNeedle = false
  200. }
  201. }
  202. missingNeedles = append(missingNeedles, minuendValue)
  203. } else {
  204. if minuendValue.Size.IsDeleted() && !subtrahendValue.Size.IsDeleted() {
  205. partiallyDeletedNeedles = append(partiallyDeletedNeedles, minuendValue)
  206. }
  207. if doCutoffOfLastNeedle {
  208. doCutoffOfLastNeedle = false
  209. }
  210. }
  211. return nil
  212. })
  213. fmt.Fprintf(writer, "volume %d %s has %d entries, %s missed %d and partially deleted %d entries\n",
  214. source.info.Id, source.location.dataNode.Id, counter, target.location.dataNode.Id, len(missingNeedles), len(partiallyDeletedNeedles))
  215. if counter == 0 || (len(missingNeedles) == 0 && len(partiallyDeletedNeedles) == 0) {
  216. return false, nil
  217. }
  218. missingNeedlesFraction := float64(len(missingNeedles)) / float64(counter)
  219. if missingNeedlesFraction > nonRepairThreshold {
  220. return false, fmt.Errorf(
  221. "failed to start repair volume %d, percentage of missing keys is greater than the threshold: %.2f > %.2f",
  222. source.info.Id, missingNeedlesFraction, nonRepairThreshold)
  223. }
  224. for _, needleValue := range missingNeedles {
  225. needleBlob, err := readSourceNeedleBlob(grpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, needleValue)
  226. if err != nil {
  227. return hasChanges, err
  228. }
  229. if !applyChanges {
  230. continue
  231. }
  232. if verbose {
  233. fmt.Fprintf(writer, "read %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id)
  234. }
  235. hasChanges = true
  236. if err = writeNeedleBlobToTarget(grpcDialOption, pb.NewServerAddressFromDataNode(target.location.dataNode), source.info.Id, needleValue, needleBlob); err != nil {
  237. return hasChanges, err
  238. }
  239. }
  240. if doSyncDeletions && applyChanges && len(partiallyDeletedNeedles) > 0 {
  241. var fidList []string
  242. for _, needleValue := range partiallyDeletedNeedles {
  243. fidList = append(fidList, needleValue.Key.FileId(source.info.Id))
  244. if verbose {
  245. fmt.Fprintf(writer, "delete %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id)
  246. }
  247. }
  248. deleteResults, deleteErr := operation.DeleteFileIdsAtOneVolumeServer(
  249. pb.NewServerAddressFromDataNode(target.location.dataNode),
  250. grpcDialOption, fidList, false)
  251. if deleteErr != nil {
  252. return hasChanges, deleteErr
  253. }
  254. for _, deleteResult := range deleteResults {
  255. if deleteResult.Status == http.StatusAccepted && deleteResult.Size > 0 {
  256. hasChanges = true
  257. return
  258. }
  259. }
  260. }
  261. return
  262. }
  263. func readSourceNeedleBlob(grpcDialOption grpc.DialOption, sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) {
  264. err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  265. resp, err := client.ReadNeedleBlob(context.Background(), &volume_server_pb.ReadNeedleBlobRequest{
  266. VolumeId: volumeId,
  267. Offset: needleValue.Offset.ToActualOffset(),
  268. Size: int32(needleValue.Size),
  269. })
  270. if err != nil {
  271. return err
  272. }
  273. needleBlob = resp.NeedleBlob
  274. return nil
  275. })
  276. return
  277. }
  278. func writeNeedleBlobToTarget(grpcDialOption grpc.DialOption, targetVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error {
  279. return operation.WithVolumeServerClient(false, targetVolumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  280. _, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{
  281. VolumeId: volumeId,
  282. NeedleId: uint64(needleValue.Key),
  283. Size: int32(needleValue.Size),
  284. NeedleBlob: needleBlob,
  285. })
  286. return err
  287. })
  288. }
  289. func readIndexDatabase(db *needle_map.MemDb, collection string, volumeId uint32, volumeServer pb.ServerAddress, verbose bool, writer io.Writer, grpcDialOption grpc.DialOption) error {
  290. var buf bytes.Buffer
  291. if err := copyVolumeIndexFile(collection, volumeId, volumeServer, &buf, verbose, writer, grpcDialOption); err != nil {
  292. return err
  293. }
  294. if verbose {
  295. fmt.Fprintf(writer, "load collection %s volume %d index size %d from %s ...\n", collection, volumeId, buf.Len(), volumeServer)
  296. }
  297. return db.LoadFilterFromReaderAt(bytes.NewReader(buf.Bytes()), true, false)
  298. }
  299. func copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer, verbose bool, writer io.Writer, grpcDialOption grpc.DialOption) error {
  300. return operation.WithVolumeServerClient(true, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  301. ext := ".idx"
  302. copyFileClient, err := volumeServerClient.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
  303. VolumeId: volumeId,
  304. Ext: ".idx",
  305. CompactionRevision: math.MaxUint32,
  306. StopOffset: math.MaxInt64,
  307. Collection: collection,
  308. IsEcVolume: false,
  309. IgnoreSourceFileNotFound: false,
  310. })
  311. if err != nil {
  312. return fmt.Errorf("failed to start copying volume %d%s: %v", volumeId, ext, err)
  313. }
  314. err = writeToBuffer(copyFileClient, buf)
  315. if err != nil {
  316. return fmt.Errorf("failed to copy %d%s from %s: %v", volumeId, ext, volumeServer, err)
  317. }
  318. return nil
  319. })
  320. }
  321. func writeToBuffer(client volume_server_pb.VolumeServer_CopyFileClient, buf *bytes.Buffer) error {
  322. for {
  323. resp, receiveErr := client.Recv()
  324. if receiveErr == io.EOF {
  325. break
  326. }
  327. if receiveErr != nil {
  328. return fmt.Errorf("receiving: %v", receiveErr)
  329. }
  330. buf.Write(resp.FileContent)
  331. }
  332. return nil
  333. }