You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

121 lines
3.1 KiB

  1. package erasure_coding
  2. import (
  3. "fmt"
  4. "math"
  5. "os"
  6. "path"
  7. "sort"
  8. "strconv"
  9. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  10. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  11. )
  12. type ShardId uint8
  13. type EcVolumeShard struct {
  14. VolumeId needle.VolumeId
  15. ShardId ShardId
  16. Collection string
  17. dir string
  18. ecdFile *os.File
  19. ecxFile *os.File
  20. }
  21. type EcVolumeShards []*EcVolumeShard
  22. func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) {
  23. v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId}
  24. baseFileName := v.FileName()
  25. if v.ecxFile, e = os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644); e != nil {
  26. return nil, fmt.Errorf("cannot read ec volume index %s.ecx: %v", baseFileName, e)
  27. }
  28. if v.ecdFile, e = os.OpenFile(baseFileName+ToExt(int(shardId)), os.O_RDONLY, 0644); e != nil {
  29. return nil, fmt.Errorf("cannot read ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), e)
  30. }
  31. return
  32. }
  33. func (shards *EcVolumeShards) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool {
  34. for _, s := range *shards {
  35. if s.ShardId == ecVolumeShard.ShardId {
  36. return false
  37. }
  38. }
  39. *shards = append(*shards, ecVolumeShard)
  40. sort.Slice(shards, func(i, j int) bool {
  41. return (*shards)[i].VolumeId < (*shards)[j].VolumeId ||
  42. (*shards)[i].VolumeId == (*shards)[j].VolumeId && (*shards)[i].ShardId < (*shards)[j].ShardId
  43. })
  44. return true
  45. }
  46. func (shards *EcVolumeShards) DeleteEcVolumeShard(ecVolumeShard *EcVolumeShard) bool {
  47. foundPosition := -1
  48. for i, s := range *shards {
  49. if s.ShardId == ecVolumeShard.ShardId {
  50. foundPosition = i
  51. }
  52. }
  53. if foundPosition < 0 {
  54. return false
  55. }
  56. *shards = append((*shards)[:foundPosition], (*shards)[foundPosition+1:]...)
  57. return true
  58. }
  59. func (shards *EcVolumeShards) Close() {
  60. for _, s := range *shards {
  61. s.Close()
  62. }
  63. }
  64. func (shards *EcVolumeShards) ToVolumeEcShardInformationMessage() (messages []*master_pb.VolumeEcShardInformationMessage) {
  65. prevVolumeId := needle.VolumeId(math.MaxUint32)
  66. var m *master_pb.VolumeEcShardInformationMessage
  67. for _, s := range *shards {
  68. if s.VolumeId != prevVolumeId {
  69. m = &master_pb.VolumeEcShardInformationMessage{
  70. Id: uint32(s.VolumeId),
  71. Collection: s.Collection,
  72. }
  73. messages = append(messages, m)
  74. }
  75. prevVolumeId = s.VolumeId
  76. m.EcIndexBits = uint32(ShardBits(m.EcIndexBits).AddShardId(s.ShardId))
  77. }
  78. return
  79. }
  80. func (v *EcVolumeShard) String() string {
  81. return fmt.Sprintf("ec shard %v:%v, dir:%s, Collection:%s", v.VolumeId, v.ShardId, v.dir, v.Collection)
  82. }
  83. func (v *EcVolumeShard) FileName() (fileName string) {
  84. return EcShardFileName(v.Collection, v.dir, int(v.VolumeId))
  85. }
  86. func EcShardFileName(collection string, dir string, id int) (fileName string) {
  87. idString := strconv.Itoa(id)
  88. if collection == "" {
  89. fileName = path.Join(dir, idString)
  90. } else {
  91. fileName = path.Join(dir, collection+"_"+idString)
  92. }
  93. return
  94. }
  95. func (v *EcVolumeShard) Close() {
  96. if v.ecdFile != nil {
  97. _ = v.ecdFile.Close()
  98. v.ecdFile = nil
  99. }
  100. if v.ecxFile != nil {
  101. _ = v.ecxFile.Close()
  102. v.ecxFile = nil
  103. }
  104. }