You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

225 lines
6.1 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package erasure_coding
  2. import (
  3. "errors"
  4. "fmt"
  5. "math"
  6. "os"
  7. "sort"
  8. "sync"
  9. "time"
  10. "github.com/chrislusf/seaweedfs/weed/pb"
  11. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  12. "github.com/chrislusf/seaweedfs/weed/storage/idx"
  13. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  14. "github.com/chrislusf/seaweedfs/weed/storage/types"
  15. )
  16. var (
  17. NotFoundError = errors.New("needle not found")
  18. )
  19. type EcVolume struct {
  20. VolumeId needle.VolumeId
  21. Collection string
  22. dir string
  23. ecxFile *os.File
  24. ecxFileSize int64
  25. ecxCreatedAt time.Time
  26. Shards []*EcVolumeShard
  27. ShardLocations map[ShardId][]string
  28. ShardLocationsRefreshTime time.Time
  29. ShardLocationsLock sync.RWMutex
  30. Version needle.Version
  31. ecjFile *os.File
  32. ecjFileAccessLock sync.Mutex
  33. }
  34. func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
  35. ev = &EcVolume{dir: dir, Collection: collection, VolumeId: vid}
  36. baseFileName := EcShardFileName(collection, dir, int(vid))
  37. // open ecx file
  38. if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDWR, 0644); err != nil {
  39. return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", baseFileName, err)
  40. }
  41. ecxFi, statErr := ev.ecxFile.Stat()
  42. if statErr != nil {
  43. return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", baseFileName, statErr)
  44. }
  45. ev.ecxFileSize = ecxFi.Size()
  46. ev.ecxCreatedAt = ecxFi.ModTime()
  47. // open ecj file
  48. if ev.ecjFile, err = os.OpenFile(baseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil {
  49. return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", baseFileName, err)
  50. }
  51. // read volume info
  52. ev.Version = needle.Version3
  53. if volumeInfo, found := pb.MaybeLoadVolumeInfo(baseFileName + ".vif"); found {
  54. ev.Version = needle.Version(volumeInfo.Version)
  55. }
  56. ev.ShardLocations = make(map[ShardId][]string)
  57. return
  58. }
  59. func (ev *EcVolume) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool {
  60. for _, s := range ev.Shards {
  61. if s.ShardId == ecVolumeShard.ShardId {
  62. return false
  63. }
  64. }
  65. ev.Shards = append(ev.Shards, ecVolumeShard)
  66. sort.Slice(ev.Shards, func(i, j int) bool {
  67. return ev.Shards[i].VolumeId < ev.Shards[j].VolumeId ||
  68. ev.Shards[i].VolumeId == ev.Shards[j].VolumeId && ev.Shards[i].ShardId < ev.Shards[j].ShardId
  69. })
  70. return true
  71. }
  72. func (ev *EcVolume) DeleteEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, deleted bool) {
  73. foundPosition := -1
  74. for i, s := range ev.Shards {
  75. if s.ShardId == shardId {
  76. foundPosition = i
  77. }
  78. }
  79. if foundPosition < 0 {
  80. return nil, false
  81. }
  82. ecVolumeShard = ev.Shards[foundPosition]
  83. ev.Shards = append(ev.Shards[:foundPosition], ev.Shards[foundPosition+1:]...)
  84. return ecVolumeShard, true
  85. }
  86. func (ev *EcVolume) FindEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, found bool) {
  87. for _, s := range ev.Shards {
  88. if s.ShardId == shardId {
  89. return s, true
  90. }
  91. }
  92. return nil, false
  93. }
  94. func (ev *EcVolume) Close() {
  95. for _, s := range ev.Shards {
  96. s.Close()
  97. }
  98. if ev.ecjFile != nil {
  99. ev.ecjFileAccessLock.Lock()
  100. _ = ev.ecjFile.Close()
  101. ev.ecjFile = nil
  102. ev.ecjFileAccessLock.Unlock()
  103. }
  104. if ev.ecxFile != nil {
  105. _ = ev.ecxFile.Close()
  106. ev.ecxFile = nil
  107. }
  108. }
  109. func (ev *EcVolume) Destroy() {
  110. ev.Close()
  111. for _, s := range ev.Shards {
  112. s.Destroy()
  113. }
  114. os.Remove(ev.FileName() + ".ecx")
  115. os.Remove(ev.FileName() + ".ecj")
  116. os.Remove(ev.FileName() + ".vif")
  117. }
  118. func (ev *EcVolume) FileName() string {
  119. return EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId))
  120. }
  121. func (ev *EcVolume) ShardSize() int64 {
  122. if len(ev.Shards) > 0 {
  123. return ev.Shards[0].Size()
  124. }
  125. return 0
  126. }
  127. func (ev *EcVolume) CreatedAt() time.Time {
  128. return ev.ecxCreatedAt
  129. }
  130. func (ev *EcVolume) ShardIdList() (shardIds []ShardId) {
  131. for _, s := range ev.Shards {
  132. shardIds = append(shardIds, s.ShardId)
  133. }
  134. return
  135. }
  136. func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.VolumeEcShardInformationMessage) {
  137. prevVolumeId := needle.VolumeId(math.MaxUint32)
  138. var m *master_pb.VolumeEcShardInformationMessage
  139. for _, s := range ev.Shards {
  140. if s.VolumeId != prevVolumeId {
  141. m = &master_pb.VolumeEcShardInformationMessage{
  142. Id: uint32(s.VolumeId),
  143. Collection: s.Collection,
  144. }
  145. messages = append(messages, m)
  146. }
  147. prevVolumeId = s.VolumeId
  148. m.EcIndexBits = uint32(ShardBits(m.EcIndexBits).AddShardId(s.ShardId))
  149. }
  150. return
  151. }
  152. func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.Version) (offset types.Offset, size uint32, intervals []Interval, err error) {
  153. // find the needle from ecx file
  154. offset, size, err = ev.FindNeedleFromEcx(needleId)
  155. if err != nil {
  156. return types.Offset{}, 0, nil, fmt.Errorf("FindNeedleFromEcx: %v", err)
  157. }
  158. shard := ev.Shards[0]
  159. // calculate the locations in the ec shards
  160. intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToAcutalOffset(), uint32(needle.GetActualSize(size, version)))
  161. return
  162. }
  163. func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
  164. return SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil)
  165. }
  166. func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size uint32, err error) {
  167. var key types.NeedleId
  168. buf := make([]byte, types.NeedleMapEntrySize)
  169. l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize
  170. for l < h {
  171. m := (l + h) / 2
  172. if _, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
  173. return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err)
  174. }
  175. key, offset, size = idx.IdxFileEntry(buf)
  176. if key == needleId {
  177. if processNeedleFn != nil {
  178. err = processNeedleFn(ecxFile, m*types.NeedleHeaderSize)
  179. }
  180. return
  181. }
  182. if key < needleId {
  183. l = m + 1
  184. } else {
  185. h = m
  186. }
  187. }
  188. err = NotFoundError
  189. return
  190. }