You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

131 lines
4.5 KiB

9 years ago
6 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package storage
  2. import (
  3. "fmt"
  4. "os"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/storage/backend"
  7. "github.com/chrislusf/seaweedfs/weed/storage/idx"
  8. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  9. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. )
  12. func CheckAndFixVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, err error) {
  13. var indexSize int64
  14. if indexSize, err = verifyIndexFileIntegrity(indexFile); err != nil {
  15. return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), err)
  16. }
  17. if indexSize == 0 {
  18. return 0, nil
  19. }
  20. for i := 1; i <= 10; i++ {
  21. // check and fix last 10 entries
  22. lastAppendAtNs, err = doCheckAndFixVolumeData(v, indexFile, indexSize-int64(i)*NeedleMapEntrySize)
  23. if err != ErrorSizeMismatch {
  24. break
  25. }
  26. }
  27. return
  28. }
  29. func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) (lastAppendAtNs uint64, err error) {
  30. var lastIdxEntry []byte
  31. if lastIdxEntry, err = readIndexEntryAtOffset(indexFile, indexOffset); err != nil {
  32. return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), err)
  33. }
  34. key, offset, size := idx.IdxFileEntry(lastIdxEntry)
  35. if offset.IsZero() {
  36. return 0, nil
  37. }
  38. if size < 0 {
  39. // read the deletion entry
  40. if lastAppendAtNs, err = verifyDeletedNeedleIntegrity(v.DataBackend, v.Version(), key); err != nil {
  41. return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), err)
  42. }
  43. } else {
  44. if lastAppendAtNs, err = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); err != nil {
  45. return lastAppendAtNs, err
  46. }
  47. }
  48. return lastAppendAtNs, nil
  49. }
  50. func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) {
  51. if indexSize, err = util.GetFileSize(indexFile); err == nil {
  52. if indexSize%NeedleMapEntrySize != 0 {
  53. err = fmt.Errorf("index file's size is %d bytes, maybe corrupted", indexSize)
  54. }
  55. }
  56. return
  57. }
  58. func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err error) {
  59. if offset < 0 {
  60. err = fmt.Errorf("offset %d for index file is invalid", offset)
  61. return
  62. }
  63. bytes = make([]byte, NeedleMapEntrySize)
  64. _, err = indexFile.ReadAt(bytes, offset)
  65. return
  66. }
  67. func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key NeedleId, size Size) (lastAppendAtNs uint64, err error) {
  68. n, _, _, err := needle.ReadNeedleHeader(datFile, v, offset)
  69. if err != nil {
  70. return 0, fmt.Errorf("read %s at %d", datFile.Name(), offset)
  71. }
  72. if n.Size != size {
  73. return 0, ErrorSizeMismatch
  74. }
  75. if v == needle.Version3 {
  76. bytes := make([]byte, TimestampSize)
  77. _, err = datFile.ReadAt(bytes, offset+NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize)
  78. if err != nil {
  79. return 0, fmt.Errorf("check %s entry offset %d size %d: %v", datFile.Name(), offset, size, err)
  80. }
  81. n.AppendAtNs = util.BytesToUint64(bytes)
  82. fileTailOffset := offset + needle.GetActualSize(size, v)
  83. fileSize, _, err := datFile.GetStat()
  84. if err != nil {
  85. return 0, fmt.Errorf("stat file %s: %v", datFile.Name(), err)
  86. }
  87. if fileSize == fileTailOffset {
  88. return n.AppendAtNs, nil
  89. }
  90. if fileSize > fileTailOffset {
  91. glog.Warningf("Truncate %s from %d bytes to %d bytes!", datFile.Name(), fileSize, fileTailOffset)
  92. err = datFile.Truncate(fileTailOffset)
  93. if err == nil {
  94. return n.AppendAtNs, nil
  95. }
  96. return n.AppendAtNs, fmt.Errorf("truncate file %s: %v", datFile.Name(), err)
  97. }
  98. glog.Warningf("data file %s has %d bytes, less than expected %d bytes!", datFile.Name(), fileSize, fileTailOffset)
  99. }
  100. if err = n.ReadData(datFile, offset, size, v); err != nil {
  101. return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", offset, offset+int64(size), err)
  102. }
  103. if n.Id != key {
  104. return n.AppendAtNs, fmt.Errorf("index key %#x does not match needle's Id %#x", key, n.Id)
  105. }
  106. return n.AppendAtNs, err
  107. }
  108. func verifyDeletedNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, key NeedleId) (lastAppendAtNs uint64, err error) {
  109. n := new(needle.Needle)
  110. size := n.DiskSize(v)
  111. var fileSize int64
  112. fileSize, _, err = datFile.GetStat()
  113. if err != nil {
  114. return 0, fmt.Errorf("GetStat: %v", err)
  115. }
  116. if err = n.ReadData(datFile, fileSize-size, Size(0), v); err != nil {
  117. return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", fileSize-size, size, err)
  118. }
  119. if n.Id != key {
  120. return n.AppendAtNs, fmt.Errorf("index key %#x does not match needle's Id %#x", key, n.Id)
  121. }
  122. return n.AppendAtNs, err
  123. }