You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

398 lines
15 KiB

3 years ago
4 years ago
4 years ago
4 years ago
4 years ago
6 months ago
  1. package shell
  2. import (
  3. "bytes"
  4. "context"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "math"
  9. "net/http"
  10. "sync"
  11. "time"
  12. "github.com/seaweedfs/seaweedfs/weed/operation"
  13. "github.com/seaweedfs/seaweedfs/weed/pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/server/constants"
  17. "github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
  18. "golang.org/x/exp/slices"
  19. "google.golang.org/grpc"
  20. )
  21. func init() {
  22. Commands = append(Commands, &commandVolumeCheckDisk{})
  23. }
  24. type commandVolumeCheckDisk struct {
  25. env *CommandEnv
  26. writer io.Writer
  27. }
  28. func (c *commandVolumeCheckDisk) Name() string {
  29. return "volume.check.disk"
  30. }
  31. func (c *commandVolumeCheckDisk) Help() string {
  32. return `check all replicated volumes to find and fix inconsistencies. It is optional and resource intensive.
  33. How it works:
  34. find all volumes that are replicated
  35. for each volume id, if there are more than 2 replicas, find one pair with the largest 2 in file count.
  36. for the pair volume A and B
  37. append entries in A and not in B to B
  38. append entries in B and not in A to A
  39. `
  40. }
  41. func (c *commandVolumeCheckDisk) getVolumeStatusFileCount(vid uint32, dn *master_pb.DataNodeInfo) (totalFileCount, deletedFileCount uint64) {
  42. err := operation.WithVolumeServerClient(false, pb.NewServerAddressWithGrpcPort(dn.Id, int(dn.GrpcPort)), c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  43. resp, reqErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{
  44. VolumeId: uint32(vid),
  45. })
  46. if resp != nil {
  47. totalFileCount = resp.FileCount
  48. deletedFileCount = resp.FileDeletedCount
  49. }
  50. return reqErr
  51. })
  52. if err != nil {
  53. fmt.Fprintf(c.writer, "getting number of files for volume id %d from volumes status: %+v\n", vid, err)
  54. }
  55. return totalFileCount, deletedFileCount
  56. }
  57. func (c *commandVolumeCheckDisk) eqVolumeFileCount(a, b *VolumeReplica) (bool, bool) {
  58. var waitGroup sync.WaitGroup
  59. var fileCountA, fileCountB, fileDeletedCountA, fileDeletedCountB uint64
  60. waitGroup.Add(1)
  61. go func() {
  62. defer waitGroup.Done()
  63. fileCountA, fileDeletedCountA = c.getVolumeStatusFileCount(a.info.Id, a.location.dataNode)
  64. }()
  65. waitGroup.Add(1)
  66. go func() {
  67. defer waitGroup.Done()
  68. fileCountB, fileDeletedCountB = c.getVolumeStatusFileCount(b.info.Id, b.location.dataNode)
  69. }()
  70. // Trying to synchronize a remote call to two nodes
  71. waitGroup.Wait()
  72. return fileCountA == fileCountB, fileDeletedCountA == fileDeletedCountB
  73. }
  74. func (c *commandVolumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica, pulseTimeAtSecond int64, syncDeletions, verbose bool) bool {
  75. doSyncDeletedCount := false
  76. if syncDeletions && a.info.DeleteCount != b.info.DeleteCount {
  77. doSyncDeletedCount = true
  78. }
  79. if (a.info.FileCount != b.info.FileCount) || doSyncDeletedCount {
  80. // Do synchronization of volumes, if the modification time was before the last pulsation time
  81. if a.info.ModifiedAtSecond < pulseTimeAtSecond || b.info.ModifiedAtSecond < pulseTimeAtSecond {
  82. return false
  83. }
  84. if eqFileCount, eqDeletedFileCount := c.eqVolumeFileCount(a, b); eqFileCount {
  85. if doSyncDeletedCount && !eqDeletedFileCount {
  86. return false
  87. }
  88. if verbose {
  89. fmt.Fprintf(c.writer, "skipping active volumes %d with the same file counts on %s and %s\n",
  90. a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id)
  91. }
  92. } else {
  93. return false
  94. }
  95. }
  96. return true
  97. }
  98. func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  99. fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  100. slowMode := fsckCommand.Bool("slow", false, "slow mode checks all replicas even file counts are the same")
  101. verbose := fsckCommand.Bool("v", false, "verbose mode")
  102. volumeId := fsckCommand.Uint("volumeId", 0, "the volume id")
  103. applyChanges := fsckCommand.Bool("force", false, "apply the fix")
  104. syncDeletions := fsckCommand.Bool("syncDeleted", false, "sync of deletions the fix")
  105. nonRepairThreshold := fsckCommand.Float64("nonRepairThreshold", 0.3, "repair when missing keys is not more than this limit")
  106. if err = fsckCommand.Parse(args); err != nil {
  107. return nil
  108. }
  109. infoAboutSimulationMode(writer, *applyChanges, "-force")
  110. if err = commandEnv.confirmIsLocked(args); err != nil {
  111. return
  112. }
  113. c.env = commandEnv
  114. c.writer = writer
  115. // collect topology information
  116. pulseTimeAtSecond := time.Now().Unix() - constants.VolumePulseSeconds*2
  117. topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
  118. if err != nil {
  119. return err
  120. }
  121. volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo)
  122. // pick 1 pairs of volume replica
  123. for _, replicas := range volumeReplicas {
  124. if *volumeId > 0 && replicas[0].info.Id != uint32(*volumeId) {
  125. continue
  126. }
  127. // filter readonly replica
  128. var writableReplicas []*VolumeReplica
  129. for _, replica := range replicas {
  130. if replica.info.ReadOnly {
  131. fmt.Fprintf(writer, "skipping readonly volume %d on %s\n", replica.info.Id, replica.location.dataNode.Id)
  132. } else {
  133. writableReplicas = append(writableReplicas, replica)
  134. }
  135. }
  136. slices.SortFunc(writableReplicas, func(a, b *VolumeReplica) int {
  137. return int(b.info.FileCount - a.info.FileCount)
  138. })
  139. for len(writableReplicas) >= 2 {
  140. a, b := writableReplicas[0], writableReplicas[1]
  141. if !*slowMode && c.shouldSkipVolume(a, b, pulseTimeAtSecond, *syncDeletions, *verbose) {
  142. // always choose the larger volume to be the source
  143. writableReplicas = append(replicas[:1], writableReplicas[2:]...)
  144. continue
  145. }
  146. if err := c.syncTwoReplicas(a, b, *applyChanges, *syncDeletions, *nonRepairThreshold, *verbose); err != nil {
  147. fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err)
  148. }
  149. // always choose the larger volume to be the source
  150. if a.info.FileCount > b.info.FileCount {
  151. writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...)
  152. } else {
  153. writableReplicas = writableReplicas[1:]
  154. }
  155. }
  156. }
  157. return nil
  158. }
  159. func (c *commandVolumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool) (err error) {
  160. aHasChanges, bHasChanges := true, true
  161. for aHasChanges || bHasChanges {
  162. if aHasChanges, bHasChanges, err = c.checkBoth(a, b, applyChanges, doSyncDeletions, nonRepairThreshold, verbose); err != nil {
  163. return err
  164. }
  165. }
  166. return nil
  167. }
  168. func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool) (aHasChanges bool, bHasChanges bool, err error) {
  169. aDB, bDB := needle_map.NewMemDb(), needle_map.NewMemDb()
  170. defer func() {
  171. aDB.Close()
  172. bDB.Close()
  173. }()
  174. // read index db
  175. readIndexDbCutoffFrom := uint64(time.Now().UnixNano())
  176. if err = readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode), verbose, c.writer, c.env.option.GrpcDialOption); err != nil {
  177. return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", a.location.dataNode, a.info.Id, err)
  178. }
  179. if err := readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode), verbose, c.writer, c.env.option.GrpcDialOption); err != nil {
  180. return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", b.location.dataNode, b.info.Id, err)
  181. }
  182. // find and make up the differences
  183. aHasChanges, err1 := doVolumeCheckDisk(bDB, aDB, b, a, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption)
  184. bHasChanges, err2 := doVolumeCheckDisk(aDB, bDB, a, b, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption)
  185. if err1 != nil {
  186. return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err1)
  187. }
  188. if err2 != nil {
  189. return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err2)
  190. }
  191. return aHasChanges, bHasChanges, nil
  192. }
  193. func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *VolumeReplica, verbose bool, writer io.Writer, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, cutoffFromAtNs uint64, grpcDialOption grpc.DialOption) (hasChanges bool, err error) {
  194. // find missing keys
  195. // hash join, can be more efficient
  196. var missingNeedles []needle_map.NeedleValue
  197. var partiallyDeletedNeedles []needle_map.NeedleValue
  198. var counter int
  199. doCutoffOfLastNeedle := true
  200. minuend.DescendingVisit(func(minuendValue needle_map.NeedleValue) error {
  201. counter++
  202. if subtrahendValue, found := subtrahend.Get(minuendValue.Key); !found {
  203. if minuendValue.Size.IsDeleted() {
  204. return nil
  205. }
  206. if doCutoffOfLastNeedle {
  207. if needleMeta, err := readNeedleMeta(grpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, minuendValue); err == nil {
  208. // needles older than the cutoff time are not missing yet
  209. if needleMeta.AppendAtNs > cutoffFromAtNs {
  210. return nil
  211. }
  212. doCutoffOfLastNeedle = false
  213. }
  214. }
  215. missingNeedles = append(missingNeedles, minuendValue)
  216. } else {
  217. if minuendValue.Size.IsDeleted() && !subtrahendValue.Size.IsDeleted() {
  218. partiallyDeletedNeedles = append(partiallyDeletedNeedles, minuendValue)
  219. }
  220. if doCutoffOfLastNeedle {
  221. doCutoffOfLastNeedle = false
  222. }
  223. }
  224. return nil
  225. })
  226. fmt.Fprintf(writer, "volume %d %s has %d entries, %s missed %d and partially deleted %d entries\n",
  227. source.info.Id, source.location.dataNode.Id, counter, target.location.dataNode.Id, len(missingNeedles), len(partiallyDeletedNeedles))
  228. if counter == 0 || (len(missingNeedles) == 0 && len(partiallyDeletedNeedles) == 0) {
  229. return false, nil
  230. }
  231. missingNeedlesFraction := float64(len(missingNeedles)) / float64(counter)
  232. if missingNeedlesFraction > nonRepairThreshold {
  233. return false, fmt.Errorf(
  234. "failed to start repair volume %d, percentage of missing keys is greater than the threshold: %.2f > %.2f",
  235. source.info.Id, missingNeedlesFraction, nonRepairThreshold)
  236. }
  237. for _, needleValue := range missingNeedles {
  238. needleBlob, err := readSourceNeedleBlob(grpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, needleValue)
  239. if err != nil {
  240. return hasChanges, err
  241. }
  242. if !applyChanges {
  243. continue
  244. }
  245. if verbose {
  246. fmt.Fprintf(writer, "read %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id)
  247. }
  248. hasChanges = true
  249. if err = writeNeedleBlobToTarget(grpcDialOption, pb.NewServerAddressFromDataNode(target.location.dataNode), source.info.Id, needleValue, needleBlob); err != nil {
  250. return hasChanges, err
  251. }
  252. }
  253. if doSyncDeletions && applyChanges && len(partiallyDeletedNeedles) > 0 {
  254. var fidList []string
  255. for _, needleValue := range partiallyDeletedNeedles {
  256. fidList = append(fidList, needleValue.Key.FileId(source.info.Id))
  257. if verbose {
  258. fmt.Fprintf(writer, "delete %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id)
  259. }
  260. }
  261. deleteResults, deleteErr := operation.DeleteFileIdsAtOneVolumeServer(
  262. pb.NewServerAddressFromDataNode(target.location.dataNode),
  263. grpcDialOption, fidList, false)
  264. if deleteErr != nil {
  265. return hasChanges, deleteErr
  266. }
  267. for _, deleteResult := range deleteResults {
  268. if deleteResult.Status == http.StatusAccepted && deleteResult.Size > 0 {
  269. hasChanges = true
  270. return
  271. }
  272. }
  273. }
  274. return
  275. }
  276. func readSourceNeedleBlob(grpcDialOption grpc.DialOption, sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) {
  277. err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  278. resp, err := client.ReadNeedleBlob(context.Background(), &volume_server_pb.ReadNeedleBlobRequest{
  279. VolumeId: volumeId,
  280. Offset: needleValue.Offset.ToActualOffset(),
  281. Size: int32(needleValue.Size),
  282. })
  283. if err != nil {
  284. return err
  285. }
  286. needleBlob = resp.NeedleBlob
  287. return nil
  288. })
  289. return
  290. }
  291. func writeNeedleBlobToTarget(grpcDialOption grpc.DialOption, targetVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error {
  292. return operation.WithVolumeServerClient(false, targetVolumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  293. _, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{
  294. VolumeId: volumeId,
  295. NeedleId: uint64(needleValue.Key),
  296. Size: int32(needleValue.Size),
  297. NeedleBlob: needleBlob,
  298. })
  299. return err
  300. })
  301. }
  302. func readIndexDatabase(db *needle_map.MemDb, collection string, volumeId uint32, volumeServer pb.ServerAddress, verbose bool, writer io.Writer, grpcDialOption grpc.DialOption) error {
  303. var buf bytes.Buffer
  304. if err := copyVolumeIndexFile(collection, volumeId, volumeServer, &buf, verbose, writer, grpcDialOption); err != nil {
  305. return err
  306. }
  307. if verbose {
  308. fmt.Fprintf(writer, "load collection %s volume %d index size %d from %s ...\n", collection, volumeId, buf.Len(), volumeServer)
  309. }
  310. return db.LoadFilterFromReaderAt(bytes.NewReader(buf.Bytes()), true, false)
  311. }
  312. func copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer, verbose bool, writer io.Writer, grpcDialOption grpc.DialOption) error {
  313. return operation.WithVolumeServerClient(true, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  314. ext := ".idx"
  315. copyFileClient, err := volumeServerClient.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
  316. VolumeId: volumeId,
  317. Ext: ".idx",
  318. CompactionRevision: math.MaxUint32,
  319. StopOffset: math.MaxInt64,
  320. Collection: collection,
  321. IsEcVolume: false,
  322. IgnoreSourceFileNotFound: false,
  323. })
  324. if err != nil {
  325. return fmt.Errorf("failed to start copying volume %d%s: %v", volumeId, ext, err)
  326. }
  327. err = writeToBuffer(copyFileClient, buf)
  328. if err != nil {
  329. return fmt.Errorf("failed to copy %d%s from %s: %v", volumeId, ext, volumeServer, err)
  330. }
  331. return nil
  332. })
  333. }
  334. func writeToBuffer(client volume_server_pb.VolumeServer_CopyFileClient, buf *bytes.Buffer) error {
  335. for {
  336. resp, receiveErr := client.Recv()
  337. if receiveErr == io.EOF {
  338. break
  339. }
  340. if receiveErr != nil {
  341. return fmt.Errorf("receiving: %v", receiveErr)
  342. }
  343. buf.Write(resp.FileContent)
  344. }
  345. return nil
  346. }