You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

248 lines
6.8 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package erasure_coding
  2. import (
  3. "errors"
  4. "fmt"
  5. "math"
  6. "os"
  7. "sort"
  8. "sync"
  9. "time"
  10. "github.com/chrislusf/seaweedfs/weed/pb"
  11. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  13. "github.com/chrislusf/seaweedfs/weed/storage/idx"
  14. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  15. "github.com/chrislusf/seaweedfs/weed/storage/types"
  16. )
  17. var (
  18. NotFoundError = errors.New("needle not found")
  19. )
  20. type EcVolume struct {
  21. VolumeId needle.VolumeId
  22. Collection string
  23. dir string
  24. dirIdx string
  25. ecxFile *os.File
  26. ecxFileSize int64
  27. ecxCreatedAt time.Time
  28. Shards []*EcVolumeShard
  29. ShardLocations map[ShardId][]string
  30. ShardLocationsRefreshTime time.Time
  31. ShardLocationsLock sync.RWMutex
  32. Version needle.Version
  33. ecjFile *os.File
  34. ecjFileAccessLock sync.Mutex
  35. }
  36. func NewEcVolume(dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
  37. ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid}
  38. dataBaseFileName := EcShardFileName(collection, dir, int(vid))
  39. indexBaseFileName := EcShardFileName(collection, dirIdx, int(vid))
  40. // open ecx file
  41. if ev.ecxFile, err = os.OpenFile(indexBaseFileName+".ecx", os.O_RDWR, 0644); err != nil {
  42. return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", indexBaseFileName, err)
  43. }
  44. ecxFi, statErr := ev.ecxFile.Stat()
  45. if statErr != nil {
  46. return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", indexBaseFileName, statErr)
  47. }
  48. ev.ecxFileSize = ecxFi.Size()
  49. ev.ecxCreatedAt = ecxFi.ModTime()
  50. // open ecj file
  51. if ev.ecjFile, err = os.OpenFile(indexBaseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil {
  52. return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", indexBaseFileName, err)
  53. }
  54. // read volume info
  55. ev.Version = needle.Version3
  56. if volumeInfo, found, _ := pb.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found {
  57. ev.Version = needle.Version(volumeInfo.Version)
  58. } else {
  59. pb.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
  60. }
  61. ev.ShardLocations = make(map[ShardId][]string)
  62. return
  63. }
  64. func (ev *EcVolume) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool {
  65. for _, s := range ev.Shards {
  66. if s.ShardId == ecVolumeShard.ShardId {
  67. return false
  68. }
  69. }
  70. ev.Shards = append(ev.Shards, ecVolumeShard)
  71. sort.Slice(ev.Shards, func(i, j int) bool {
  72. return ev.Shards[i].VolumeId < ev.Shards[j].VolumeId ||
  73. ev.Shards[i].VolumeId == ev.Shards[j].VolumeId && ev.Shards[i].ShardId < ev.Shards[j].ShardId
  74. })
  75. return true
  76. }
  77. func (ev *EcVolume) DeleteEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, deleted bool) {
  78. foundPosition := -1
  79. for i, s := range ev.Shards {
  80. if s.ShardId == shardId {
  81. foundPosition = i
  82. }
  83. }
  84. if foundPosition < 0 {
  85. return nil, false
  86. }
  87. ecVolumeShard = ev.Shards[foundPosition]
  88. ev.Shards = append(ev.Shards[:foundPosition], ev.Shards[foundPosition+1:]...)
  89. return ecVolumeShard, true
  90. }
  91. func (ev *EcVolume) FindEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, found bool) {
  92. for _, s := range ev.Shards {
  93. if s.ShardId == shardId {
  94. return s, true
  95. }
  96. }
  97. return nil, false
  98. }
  99. func (ev *EcVolume) Close() {
  100. for _, s := range ev.Shards {
  101. s.Close()
  102. }
  103. if ev.ecjFile != nil {
  104. ev.ecjFileAccessLock.Lock()
  105. _ = ev.ecjFile.Close()
  106. ev.ecjFile = nil
  107. ev.ecjFileAccessLock.Unlock()
  108. }
  109. if ev.ecxFile != nil {
  110. _ = ev.ecxFile.Close()
  111. ev.ecxFile = nil
  112. }
  113. }
  114. func (ev *EcVolume) Destroy() {
  115. ev.Close()
  116. for _, s := range ev.Shards {
  117. s.Destroy()
  118. }
  119. os.Remove(ev.FileName(".ecx"))
  120. os.Remove(ev.FileName(".ecj"))
  121. os.Remove(ev.FileName(".vif"))
  122. }
  123. func (ev *EcVolume) FileName(ext string) string {
  124. switch ext {
  125. case ".ecx", ".ecj":
  126. return ev.IndexBaseFileName() + ext
  127. }
  128. // .vif
  129. return ev.DataBaseFileName() + ext
  130. }
  131. func (ev *EcVolume) DataBaseFileName() string {
  132. return EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId))
  133. }
  134. func (ev *EcVolume) IndexBaseFileName() string {
  135. return EcShardFileName(ev.Collection, ev.dirIdx, int(ev.VolumeId))
  136. }
  137. func (ev *EcVolume) ShardSize() uint64 {
  138. if len(ev.Shards) > 0 {
  139. return uint64(ev.Shards[0].Size())
  140. }
  141. return 0
  142. }
  143. func (ev *EcVolume) Size() (size int64) {
  144. for _, shard := range ev.Shards {
  145. size += shard.Size()
  146. }
  147. return
  148. }
  149. func (ev *EcVolume) CreatedAt() time.Time {
  150. return ev.ecxCreatedAt
  151. }
  152. func (ev *EcVolume) ShardIdList() (shardIds []ShardId) {
  153. for _, s := range ev.Shards {
  154. shardIds = append(shardIds, s.ShardId)
  155. }
  156. return
  157. }
  158. func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.VolumeEcShardInformationMessage) {
  159. prevVolumeId := needle.VolumeId(math.MaxUint32)
  160. var m *master_pb.VolumeEcShardInformationMessage
  161. for _, s := range ev.Shards {
  162. if s.VolumeId != prevVolumeId {
  163. m = &master_pb.VolumeEcShardInformationMessage{
  164. Id: uint32(s.VolumeId),
  165. Collection: s.Collection,
  166. }
  167. messages = append(messages, m)
  168. }
  169. prevVolumeId = s.VolumeId
  170. m.EcIndexBits = uint32(ShardBits(m.EcIndexBits).AddShardId(s.ShardId))
  171. }
  172. return
  173. }
  174. func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.Version) (offset types.Offset, size types.Size, intervals []Interval, err error) {
  175. // find the needle from ecx file
  176. offset, size, err = ev.FindNeedleFromEcx(needleId)
  177. if err != nil {
  178. return types.Offset{}, 0, nil, fmt.Errorf("FindNeedleFromEcx: %v", err)
  179. }
  180. shard := ev.Shards[0]
  181. // calculate the locations in the ec shards
  182. intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToAcutalOffset(), types.Size(needle.GetActualSize(size, version)))
  183. return
  184. }
  185. func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size types.Size, err error) {
  186. return SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil)
  187. }
  188. func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size types.Size, err error) {
  189. var key types.NeedleId
  190. buf := make([]byte, types.NeedleMapEntrySize)
  191. l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize
  192. for l < h {
  193. m := (l + h) / 2
  194. if _, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
  195. return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err)
  196. }
  197. key, offset, size = idx.IdxFileEntry(buf)
  198. if key == needleId {
  199. if processNeedleFn != nil {
  200. err = processNeedleFn(ecxFile, m*types.NeedleHeaderSize)
  201. }
  202. return
  203. }
  204. if key < needleId {
  205. l = m + 1
  206. } else {
  207. h = m
  208. }
  209. }
  210. err = NotFoundError
  211. return
  212. }