You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

271 lines
7.8 KiB

6 years ago
6 months ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package erasure_coding
  2. import (
  3. "errors"
  4. "fmt"
  5. "math"
  6. "os"
  7. "sync"
  8. "time"
  9. "golang.org/x/exp/slices"
  10. "github.com/seaweedfs/seaweedfs/weed/pb"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/storage/idx"
  14. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  15. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
  17. )
  18. var (
  19. NotFoundError = errors.New("needle not found")
  20. )
  21. type EcVolume struct {
  22. VolumeId needle.VolumeId
  23. Collection string
  24. dir string
  25. dirIdx string
  26. ecxFile *os.File
  27. ecxFileSize int64
  28. ecxCreatedAt time.Time
  29. Shards []*EcVolumeShard
  30. ShardLocations map[ShardId][]pb.ServerAddress
  31. ShardLocationsRefreshTime time.Time
  32. ShardLocationsLock sync.RWMutex
  33. Version needle.Version
  34. ecjFile *os.File
  35. ecjFileAccessLock sync.Mutex
  36. diskType types.DiskType
  37. datFileSize int64
  38. }
  39. func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
  40. ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid, diskType: diskType}
  41. dataBaseFileName := EcShardFileName(collection, dir, int(vid))
  42. indexBaseFileName := EcShardFileName(collection, dirIdx, int(vid))
  43. // open ecx file
  44. if ev.ecxFile, err = os.OpenFile(indexBaseFileName+".ecx", os.O_RDWR, 0644); err != nil {
  45. return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", indexBaseFileName, err)
  46. }
  47. ecxFi, statErr := ev.ecxFile.Stat()
  48. if statErr != nil {
  49. _ = ev.ecxFile.Close()
  50. return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", indexBaseFileName, statErr)
  51. }
  52. ev.ecxFileSize = ecxFi.Size()
  53. ev.ecxCreatedAt = ecxFi.ModTime()
  54. // open ecj file
  55. if ev.ecjFile, err = os.OpenFile(indexBaseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil {
  56. return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", indexBaseFileName, err)
  57. }
  58. // read volume info
  59. ev.Version = needle.Version3
  60. if volumeInfo, _, found, _ := volume_info.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found {
  61. ev.Version = needle.Version(volumeInfo.Version)
  62. ev.datFileSize = volumeInfo.DatFileSize
  63. } else {
  64. volume_info.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
  65. }
  66. ev.ShardLocations = make(map[ShardId][]pb.ServerAddress)
  67. return
  68. }
  69. func (ev *EcVolume) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool {
  70. for _, s := range ev.Shards {
  71. if s.ShardId == ecVolumeShard.ShardId {
  72. return false
  73. }
  74. }
  75. ev.Shards = append(ev.Shards, ecVolumeShard)
  76. slices.SortFunc(ev.Shards, func(a, b *EcVolumeShard) int {
  77. if a.VolumeId != b.VolumeId {
  78. return int(a.VolumeId - b.VolumeId)
  79. }
  80. return int(a.ShardId - b.ShardId)
  81. })
  82. return true
  83. }
  84. func (ev *EcVolume) DeleteEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, deleted bool) {
  85. foundPosition := -1
  86. for i, s := range ev.Shards {
  87. if s.ShardId == shardId {
  88. foundPosition = i
  89. }
  90. }
  91. if foundPosition < 0 {
  92. return nil, false
  93. }
  94. ecVolumeShard = ev.Shards[foundPosition]
  95. ev.Shards = append(ev.Shards[:foundPosition], ev.Shards[foundPosition+1:]...)
  96. return ecVolumeShard, true
  97. }
  98. func (ev *EcVolume) FindEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, found bool) {
  99. for _, s := range ev.Shards {
  100. if s.ShardId == shardId {
  101. return s, true
  102. }
  103. }
  104. return nil, false
  105. }
  106. func (ev *EcVolume) Close() {
  107. for _, s := range ev.Shards {
  108. s.Close()
  109. }
  110. if ev.ecjFile != nil {
  111. ev.ecjFileAccessLock.Lock()
  112. _ = ev.ecjFile.Close()
  113. ev.ecjFile = nil
  114. ev.ecjFileAccessLock.Unlock()
  115. }
  116. if ev.ecxFile != nil {
  117. _ = ev.ecxFile.Sync()
  118. _ = ev.ecxFile.Close()
  119. ev.ecxFile = nil
  120. }
  121. }
  122. func (ev *EcVolume) Destroy() {
  123. ev.Close()
  124. for _, s := range ev.Shards {
  125. s.Destroy()
  126. }
  127. os.Remove(ev.FileName(".ecx"))
  128. os.Remove(ev.FileName(".ecj"))
  129. os.Remove(ev.FileName(".vif"))
  130. }
  131. func (ev *EcVolume) FileName(ext string) string {
  132. switch ext {
  133. case ".ecx", ".ecj":
  134. return ev.IndexBaseFileName() + ext
  135. }
  136. // .vif
  137. return ev.DataBaseFileName() + ext
  138. }
  139. func (ev *EcVolume) DataBaseFileName() string {
  140. return EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId))
  141. }
  142. func (ev *EcVolume) IndexBaseFileName() string {
  143. return EcShardFileName(ev.Collection, ev.dirIdx, int(ev.VolumeId))
  144. }
  145. func (ev *EcVolume) ShardSize() uint64 {
  146. if len(ev.Shards) > 0 {
  147. return uint64(ev.Shards[0].Size())
  148. }
  149. return 0
  150. }
  151. func (ev *EcVolume) Size() (size int64) {
  152. for _, shard := range ev.Shards {
  153. size += shard.Size()
  154. }
  155. return
  156. }
  157. func (ev *EcVolume) CreatedAt() time.Time {
  158. return ev.ecxCreatedAt
  159. }
  160. func (ev *EcVolume) ShardIdList() (shardIds []ShardId) {
  161. for _, s := range ev.Shards {
  162. shardIds = append(shardIds, s.ShardId)
  163. }
  164. return
  165. }
  166. func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.VolumeEcShardInformationMessage) {
  167. prevVolumeId := needle.VolumeId(math.MaxUint32)
  168. var m *master_pb.VolumeEcShardInformationMessage
  169. for _, s := range ev.Shards {
  170. if s.VolumeId != prevVolumeId {
  171. m = &master_pb.VolumeEcShardInformationMessage{
  172. Id: uint32(s.VolumeId),
  173. Collection: s.Collection,
  174. DiskType: string(ev.diskType),
  175. }
  176. messages = append(messages, m)
  177. }
  178. prevVolumeId = s.VolumeId
  179. m.EcIndexBits = uint32(ShardBits(m.EcIndexBits).AddShardId(s.ShardId))
  180. }
  181. return
  182. }
  183. func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.Version) (offset types.Offset, size types.Size, intervals []Interval, err error) {
  184. // find the needle from ecx file
  185. offset, size, err = ev.FindNeedleFromEcx(needleId)
  186. if err != nil {
  187. return types.Offset{}, 0, nil, fmt.Errorf("FindNeedleFromEcx: %v", err)
  188. }
  189. intervals = ev.LocateEcShardNeedleInterval(version, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, version)))
  190. return
  191. }
  192. func (ev *EcVolume) LocateEcShardNeedleInterval(version needle.Version, offset int64, size types.Size) (intervals []Interval) {
  193. shard := ev.Shards[0]
  194. // Usually shard will be padded to round of ErasureCodingSmallBlockSize.
  195. // So in most cases, if shardSize equals to n * ErasureCodingLargeBlockSize,
  196. // the data would be in small blocks.
  197. shardSize := shard.ecdFileSize - 1
  198. if ev.datFileSize > 0 {
  199. // To get the correct LargeBlockRowsCount
  200. // use datFileSize to calculate the shardSize to match the EC encoding logic.
  201. shardSize = ev.datFileSize / DataShardsCount
  202. }
  203. // calculate the locations in the ec shards
  204. intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, shardSize, offset, types.Size(needle.GetActualSize(size, version)))
  205. return
  206. }
  207. func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size types.Size, err error) {
  208. return SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil)
  209. }
  210. func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size types.Size, err error) {
  211. var key types.NeedleId
  212. buf := make([]byte, types.NeedleMapEntrySize)
  213. l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize
  214. for l < h {
  215. m := (l + h) / 2
  216. if _, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
  217. return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err)
  218. }
  219. key, offset, size = idx.IdxFileEntry(buf)
  220. if key == needleId {
  221. if processNeedleFn != nil {
  222. err = processNeedleFn(ecxFile, m*types.NeedleHeaderSize)
  223. }
  224. return
  225. }
  226. if key < needleId {
  227. l = m + 1
  228. } else {
  229. h = m
  230. }
  231. }
  232. err = NotFoundError
  233. return
  234. }