You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

191 lines
4.9 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package erasure_coding
  2. import (
  3. "fmt"
  4. "math"
  5. "os"
  6. "sort"
  7. "sync"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  10. "github.com/chrislusf/seaweedfs/weed/storage/idx"
  11. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  12. "github.com/chrislusf/seaweedfs/weed/storage/types"
  13. )
  14. type EcVolume struct {
  15. VolumeId needle.VolumeId
  16. Collection string
  17. dir string
  18. ecxFile *os.File
  19. ecxFileSize int64
  20. ecxCreatedAt time.Time
  21. Shards []*EcVolumeShard
  22. ShardLocations map[ShardId][]string
  23. ShardLocationsRefreshTime time.Time
  24. ShardLocationsLock sync.RWMutex
  25. Version needle.Version
  26. }
  27. func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
  28. ev = &EcVolume{dir: dir, Collection: collection, VolumeId: vid}
  29. baseFileName := EcShardFileName(collection, dir, int(vid))
  30. // open ecx file
  31. if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644); err != nil {
  32. return nil, fmt.Errorf("cannot read ec volume index %s.ecx: %v", baseFileName, err)
  33. }
  34. ecxFi, statErr := ev.ecxFile.Stat()
  35. if statErr != nil {
  36. return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", baseFileName, statErr)
  37. }
  38. ev.ecxFileSize = ecxFi.Size()
  39. ev.ecxCreatedAt = ecxFi.ModTime()
  40. ev.ShardLocations = make(map[ShardId][]string)
  41. return
  42. }
  43. func (ev *EcVolume) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool {
  44. for _, s := range ev.Shards {
  45. if s.ShardId == ecVolumeShard.ShardId {
  46. return false
  47. }
  48. }
  49. ev.Shards = append(ev.Shards, ecVolumeShard)
  50. sort.Slice(ev.Shards, func(i, j int) bool {
  51. return ev.Shards[i].VolumeId < ev.Shards[j].VolumeId ||
  52. ev.Shards[i].VolumeId == ev.Shards[j].VolumeId && ev.Shards[i].ShardId < ev.Shards[j].ShardId
  53. })
  54. return true
  55. }
  56. func (ev *EcVolume) DeleteEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, deleted bool) {
  57. foundPosition := -1
  58. for i, s := range ev.Shards {
  59. if s.ShardId == shardId {
  60. foundPosition = i
  61. }
  62. }
  63. if foundPosition < 0 {
  64. return nil, false
  65. }
  66. ecVolumeShard = ev.Shards[foundPosition]
  67. ev.Shards = append(ev.Shards[:foundPosition], ev.Shards[foundPosition+1:]...)
  68. return ecVolumeShard, true
  69. }
  70. func (ev *EcVolume) FindEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, found bool) {
  71. for _, s := range ev.Shards {
  72. if s.ShardId == shardId {
  73. return s, true
  74. }
  75. }
  76. return nil, false
  77. }
  78. func (ev *EcVolume) Close() {
  79. for _, s := range ev.Shards {
  80. s.Close()
  81. }
  82. if ev.ecxFile != nil {
  83. _ = ev.ecxFile.Close()
  84. ev.ecxFile = nil
  85. }
  86. }
  87. func (ev *EcVolume) Destroy() {
  88. ev.Close()
  89. for _, s := range ev.Shards {
  90. s.Destroy()
  91. }
  92. os.Remove(ev.FileName() + ".ecx")
  93. }
  94. func (ev *EcVolume) FileName() string {
  95. return EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId))
  96. }
  97. func (ev *EcVolume) ShardSize() int64 {
  98. if len(ev.Shards) > 0 {
  99. return ev.Shards[0].Size()
  100. }
  101. return 0
  102. }
  103. func (ev *EcVolume) CreatedAt() time.Time {
  104. return ev.ecxCreatedAt
  105. }
  106. func (ev *EcVolume) ShardIdList() (shardIds []ShardId) {
  107. for _, s := range ev.Shards {
  108. shardIds = append(shardIds, s.ShardId)
  109. }
  110. return
  111. }
  112. func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.VolumeEcShardInformationMessage) {
  113. prevVolumeId := needle.VolumeId(math.MaxUint32)
  114. var m *master_pb.VolumeEcShardInformationMessage
  115. for _, s := range ev.Shards {
  116. if s.VolumeId != prevVolumeId {
  117. m = &master_pb.VolumeEcShardInformationMessage{
  118. Id: uint32(s.VolumeId),
  119. Collection: s.Collection,
  120. }
  121. messages = append(messages, m)
  122. }
  123. prevVolumeId = s.VolumeId
  124. m.EcIndexBits = uint32(ShardBits(m.EcIndexBits).AddShardId(s.ShardId))
  125. }
  126. return
  127. }
  128. func (ev *EcVolume) LocateEcShardNeedle(n *needle.Needle, version needle.Version) (offset types.Offset, size uint32, intervals []Interval, err error) {
  129. // find the needle from ecx file
  130. offset, size, err = ev.findNeedleFromEcx(n.Id)
  131. if err != nil {
  132. return types.Offset{}, 0, nil, fmt.Errorf("findNeedleFromEcx: %v", err)
  133. }
  134. shard := ev.Shards[0]
  135. // calculate the locations in the ec shards
  136. intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToAcutalOffset(), uint32(needle.GetActualSize(size, version)))
  137. return
  138. }
  139. func (ev *EcVolume) findNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
  140. var key types.NeedleId
  141. buf := make([]byte, types.NeedleMapEntrySize)
  142. l, h := int64(0), ev.ecxFileSize/types.NeedleMapEntrySize
  143. for l < h {
  144. m := (l + h) / 2
  145. if _, err := ev.ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
  146. return types.Offset{}, 0, fmt.Errorf("ecx file %d read at %d: %v", ev.ecxFileSize, m*types.NeedleMapEntrySize, err)
  147. }
  148. key, offset, size = idx.IdxFileEntry(buf)
  149. if key == needleId {
  150. return
  151. }
  152. if key < needleId {
  153. l = m + 1
  154. } else {
  155. h = m
  156. }
  157. }
  158. err = fmt.Errorf("needle id %d not found", needleId)
  159. return
  160. }